This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new cf3792a945 Change scan-server-refs prefix and put UUID first (#4682) cf3792a945 is described below commit cf3792a945bde80bd00d6393e99f18b0a664a161 Author: Daniel Roberts <ddani...@gmail.com> AuthorDate: Fri Jun 21 17:03:59 2024 -0400 Change scan-server-refs prefix and put UUID first (#4682) * Modify scan-server-refs to put UUID first * Moves the scan server refs to a different prefix to avoid compatibility * issues with possible downgrades * Modifies the Scan Server Reference format to put the UUID first in the row to reduce metadata tablet contention on writes * Backported UuidUtil from elasticity Backported the UuidUtil class from commit (e68f9dd) --------- Co-authored-by: Keith Turner <ktur...@apache.org> --- .../core/metadata/ScanServerRefTabletFile.java | 102 +++++++++++++++++---- .../core/metadata/schema/MetadataSchema.java | 14 +++ .../org/apache/accumulo/core/util/UuidUtil.java | 46 ++++++++++ .../accumulo/server/metadata/ServerAmpleImpl.java | 31 +++---- .../server/util/ScanServerMetadataEntries.java | 5 +- .../org/apache/accumulo/tserver/ScanServer.java | 14 ++- .../test/ScanServerMetadataEntriesCleanIT.java | 50 +++++++++- .../accumulo/test/ScanServerMetadataEntriesIT.java | 7 +- 8 files changed, 218 insertions(+), 51 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefTabletFile.java b/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefTabletFile.java index ad169ca396..1d63fbf25d 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefTabletFile.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefTabletFile.java @@ -22,38 +22,100 @@ import java.net.URI; import java.util.Objects; import java.util.UUID; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.OldScanServerFileReferenceSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ScanServerFileReferenceSection; +import org.apache.accumulo.core.util.UuidUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import com.google.common.base.Preconditions; + public class ScanServerRefTabletFile extends TabletFile { + @SuppressWarnings("deprecation") + private static final String OLD_PREFIX = OldScanServerFileReferenceSection.getRowPrefix(); + private final String prefix; private final Value NULL_VALUE = new Value(new byte[0]); - private final Text colf; - private final Text colq; + private final Text serverAddress; + private final Text uuid; - public ScanServerRefTabletFile(String file, String serverAddress, UUID serverLockUUID) { + public ScanServerRefTabletFile(UUID serverLockUUID, String serverAddress, String file) { super(new Path(URI.create(file))); - this.colf = new Text(serverAddress); - this.colq = new Text(serverLockUUID.toString()); + // For new data, always use the current prefix + prefix = ScanServerFileReferenceSection.getRowPrefix(); + this.serverAddress = new Text(serverAddress); + uuid = new Text(serverLockUUID.toString()); } - public ScanServerRefTabletFile(String file, Text colf, Text colq) { - super(new Path(URI.create(file))); - this.colf = colf; - this.colq = colq; + public ScanServerRefTabletFile(Key k) { + super(new Path(URI.create(extractFile(k)))); + serverAddress = k.getColumnFamily(); + if (isOldPrefix(k)) { + prefix = OLD_PREFIX; + uuid = new Text(k.getColumnQualifier().toString()); + } else { + prefix = ScanServerFileReferenceSection.getRowPrefix(); + var row = k.getRow().toString(); + Preconditions.checkArgument(row.startsWith(prefix), "Unexpected row prefix %s ", row); + var uuidStr = row.substring(prefix.length()); + Preconditions.checkArgument(UuidUtil.isUUID(uuidStr, 0), "Row suffix is not uuid %s", row); + uuid = new Text(uuidStr); + } + } + + public Mutation putMutation() { + // Only write scan refs in the new format + Mutation mutation = new Mutation(prefix + uuid.toString()); + mutation.put(serverAddress, getFilePath(), getValue()); + return mutation; + } + + public Mutation putDeleteMutation() { + Mutation mutation; + if (Objects.equals(prefix, OLD_PREFIX)) { + mutation = new Mutation(prefix + this.getPath().toString()); + mutation.putDelete(serverAddress, uuid); + } else { + mutation = new Mutation(prefix + uuid.toString()); + mutation.putDelete(serverAddress, getFilePath()); + } + return mutation; + } + + private static String extractFile(Key k) { + if (isOldPrefix(k)) { + return k.getRow().toString().substring(OLD_PREFIX.length()); + } else { + return k.getColumnQualifier().toString(); + } + } + + /** + * Returns the correctly formatted range for a unique uuid + * + * @param uuid ServerLockUUID of a Scan Server + * @return Range for a single scan server + */ + public static Range getRange(UUID uuid) { + Objects.requireNonNull(uuid); + return new Range(MetadataSchema.ScanServerFileReferenceSection.getRowPrefix() + uuid); } - public String getRowSuffix() { - return this.getPathStr(); + private static boolean isOldPrefix(Key k) { + return k.getRow().toString().startsWith(OLD_PREFIX); } - public Text getServerAddress() { - return this.colf; + public UUID getServerLockUUID() { + return UUID.fromString(uuid.toString()); } - public Text getServerLockUUID() { - return this.colq; + public Text getFilePath() { + return new Text(this.getPath().toString()); } public Value getValue() { @@ -64,8 +126,8 @@ public class ScanServerRefTabletFile extends TabletFile { public int hashCode() { final int prime = 31; int result = super.hashCode(); - result = prime * result + ((colf == null) ? 0 : colf.hashCode()); - result = prime * result + ((colq == null) ? 0 : colq.hashCode()); + result = prime * result + ((serverAddress == null) ? 0 : serverAddress.hashCode()); + result = prime * result + ((uuid == null) ? 0 : uuid.hashCode()); return result; } @@ -81,13 +143,13 @@ public class ScanServerRefTabletFile extends TabletFile { return false; } ScanServerRefTabletFile other = (ScanServerRefTabletFile) obj; - return Objects.equals(colf, other.colf) && Objects.equals(colq, other.colq); + return Objects.equals(serverAddress, other.serverAddress) && Objects.equals(uuid, other.uuid); } @Override public String toString() { - return "ScanServerRefTabletFile [file=" + this.getRowSuffix() + ", server address=" + colf - + ", server lock uuid=" + colq + "]"; + return "ScanServerRefTabletFile [file=" + this.getPath().toString() + ", server address=" + + serverAddress + ", server lock uuid=" + uuid + "]"; } } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index b1b306b083..5bb4e3a8ca 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -488,6 +488,20 @@ public class MetadataSchema { } public static class ScanServerFileReferenceSection { + private static final Section section = + new Section(RESERVED_PREFIX + "scanfileref", true, RESERVED_PREFIX + "scanfilereg", false); + + public static Range getRange() { + return section.getRange(); + } + + public static String getRowPrefix() { + return section.getRowPrefix(); + } + } + + @Deprecated(since = "2.1") + public static class OldScanServerFileReferenceSection { private static final Section section = new Section(RESERVED_PREFIX + "sserv", true, RESERVED_PREFIX + "sserx", false); diff --git a/core/src/main/java/org/apache/accumulo/core/util/UuidUtil.java b/core/src/main/java/org/apache/accumulo/core/util/UuidUtil.java new file mode 100644 index 0000000000..83d41abe0f --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/util/UuidUtil.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.util; + +public class UuidUtil { + /** + * A fast method for verifying a suffix of a string looks like a uuid. + * + * @param offset location where the uuid starts. Its expected the uuid occupies the rest of the + * string. + */ + public static boolean isUUID(String uuid, int offset) { + if (uuid.length() - offset != 36) { + return false; + } + for (int i = 0; i < 36; i++) { + var c = uuid.charAt(i + offset); + if (i == 8 || i == 13 || i == 18 || i == 23) { + if (c != '-') { + // expect '-' char at above positions, did not see it + return false; + } + } else if (c < '0' || (c > '9' && c < 'A') || (c > 'F' && c < 'a') || c > 'f') { + // expected hex at all other positions, did not see hex chars + return false; + } + } + return true; + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index ef02f22b4d..ed531167ca 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -33,6 +33,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.MutationsRejectedException; @@ -60,6 +61,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.BlipSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection.SkewedKeyValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ExternalCompactionSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.OldScanServerFileReferenceSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ScanServerFileReferenceSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; import org.apache.accumulo.core.security.Authorizations; @@ -345,11 +347,8 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { @Override public void putScanServerFileReferences(Collection<ScanServerRefTabletFile> scanRefs) { try (BatchWriter writer = context.createBatchWriter(DataLevel.USER.metaTable())) { - String prefix = ScanServerFileReferenceSection.getRowPrefix(); for (ScanServerRefTabletFile ref : scanRefs) { - Mutation m = new Mutation(prefix + ref.getRowSuffix()); - m.put(ref.getServerAddress(), ref.getServerLockUUID(), ref.getValue()); - writer.addMutation(m); + writer.addMutation(ref.putMutation()); } } catch (MutationsRejectedException | TableNotFoundException e) { throw new IllegalStateException( @@ -358,14 +357,15 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { } @Override + @SuppressWarnings("deprecation") public Stream<ScanServerRefTabletFile> getScanServerFileReferences() { try { - Scanner scanner = context.createScanner(DataLevel.USER.metaTable(), Authorizations.EMPTY); - scanner.setRange(ScanServerFileReferenceSection.getRange()); - int pLen = ScanServerFileReferenceSection.getRowPrefix().length(); + BatchScanner scanner = + context.createBatchScanner(DataLevel.USER.metaTable(), Authorizations.EMPTY); + scanner.setRanges(Set.of(ScanServerFileReferenceSection.getRange(), + OldScanServerFileReferenceSection.getRange())); return StreamSupport.stream(scanner.spliterator(), false) - .map(e -> new ScanServerRefTabletFile(e.getKey().getRowData().toString().substring(pLen), - e.getKey().getColumnFamily(), e.getKey().getColumnQualifier())); + .map(e -> new ScanServerRefTabletFile(e.getKey())); } catch (TableNotFoundException e) { throw new IllegalStateException(DataLevel.USER.metaTable() + " not found!", e); } @@ -377,14 +377,10 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { Objects.requireNonNull(scanServerLockUUID, "Server uuid must be supplied"); try ( Scanner scanner = context.createScanner(DataLevel.USER.metaTable(), Authorizations.EMPTY)) { - scanner.setRange(ScanServerFileReferenceSection.getRange()); - scanner.fetchColumn(new Text(serverAddress), new Text(scanServerLockUUID.toString())); + scanner.setRange(ScanServerRefTabletFile.getRange(scanServerLockUUID)); - int pLen = ScanServerFileReferenceSection.getRowPrefix().length(); Set<ScanServerRefTabletFile> refsToDelete = StreamSupport.stream(scanner.spliterator(), false) - .map(e -> new ScanServerRefTabletFile(e.getKey().getRowData().toString().substring(pLen), - e.getKey().getColumnFamily(), e.getKey().getColumnQualifier())) - .collect(Collectors.toSet()); + .map(e -> new ScanServerRefTabletFile(e.getKey())).collect(Collectors.toSet()); if (!refsToDelete.isEmpty()) { this.deleteScanServerFileReferences(refsToDelete); @@ -397,11 +393,8 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { @Override public void deleteScanServerFileReferences(Collection<ScanServerRefTabletFile> refsToDelete) { try (BatchWriter writer = context.createBatchWriter(DataLevel.USER.metaTable())) { - String prefix = ScanServerFileReferenceSection.getRowPrefix(); for (ScanServerRefTabletFile ref : refsToDelete) { - Mutation m = new Mutation(prefix + ref.getRowSuffix()); - m.putDelete(ref.getServerAddress(), ref.getServerLockUUID()); - writer.addMutation(m); + writer.addMutation(ref.putDeleteMutation()); } log.debug("Deleted scan server file reference entries for files: {}", refsToDelete); } catch (MutationsRejectedException | TableNotFoundException e) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ScanServerMetadataEntries.java b/server/base/src/main/java/org/apache/accumulo/server/util/ScanServerMetadataEntries.java index 61e3244c9a..d9beb30f50 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ScanServerMetadataEntries.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ScanServerMetadataEntries.java @@ -40,7 +40,7 @@ public class ScanServerMetadataEntries { // collect all uuids that are currently in the metadata table context.getAmple().getScanServerFileReferences().forEach(ssrtf -> { - uuidsToDelete.add(UUID.fromString(ssrtf.getServerLockUUID().toString())); + uuidsToDelete.add(ssrtf.getServerLockUUID()); }); // gather the list of current live scan servers, its important that this is done after the above @@ -56,8 +56,7 @@ public class ScanServerMetadataEntries { final Set<ScanServerRefTabletFile> refsToDelete = new HashSet<>(); context.getAmple().getScanServerFileReferences().forEach(ssrtf -> { - - var uuid = UUID.fromString(ssrtf.getServerLockUUID().toString()); + var uuid = ssrtf.getServerLockUUID(); if (uuidsToDelete.contains(uuid)) { refsToDelete.add(ssrtf); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 2216b55489..d3c61fdd0d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -425,9 +425,13 @@ public class ScanServer extends AbstractServer LOG.info("Stopping Thrift Servers"); address.server.stop(); - LOG.info("Removing server scan references"); - this.getContext().getAmple().deleteScanServerFileReferences(clientAddress.toString(), - serverLockUUID); + try { + LOG.info("Removing server scan references"); + this.getContext().getAmple().deleteScanServerFileReferences(clientAddress.toString(), + serverLockUUID); + } catch (Exception e) { + LOG.warn("Failed to remove scan server refs from metadata location", e); + } try { LOG.debug("Closing filesystems"); @@ -625,7 +629,7 @@ public class ScanServer extends AbstractServer for (StoredTabletFile file : allFiles.keySet()) { if (!reservedFiles.containsKey(file)) { - refs.add(new ScanServerRefTabletFile(file.getPathStr(), serverAddress, serverLockUUID)); + refs.add(new ScanServerRefTabletFile(serverLockUUID, serverAddress, file.getPathStr())); filesToReserve.add(file); tabletsToCheck.add(Objects.requireNonNull(allFiles.get(file))); LOG.trace("RFFS {} need to add scan ref for file {}", myReservationId, file); @@ -829,7 +833,7 @@ public class ScanServer extends AbstractServer influxFiles.add(file); confirmed.add(file); refsToDelete - .add(new ScanServerRefTabletFile(file.getPathStr(), serverAddress, serverLockUUID)); + .add(new ScanServerRefTabletFile(serverLockUUID, serverAddress, file.getPathStr())); // remove the entry from the map while holding the write lock ensuring no new // reservations are added to the map values while the metadata operation to delete is diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesCleanIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesCleanIT.java index 6cd6ba7f03..da7e9228da 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesCleanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesCleanIT.java @@ -27,7 +27,13 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.metadata.ScanServerRefTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.OldScanServerFileReferenceSection; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.server.ServerContext; @@ -57,7 +63,7 @@ public class ScanServerMetadataEntriesCleanIT extends SharedMiniClusterBase { Set<ScanServerRefTabletFile> scanRefs = Stream.of("F0000070.rf", "F0000071.rf") .map(f -> "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/" + f) - .map(f -> new ScanServerRefTabletFile(f, server.toString(), serverLockUUID)) + .map(f -> new ScanServerRefTabletFile(serverLockUUID, server.toString(), f)) .collect(Collectors.toSet()); ServerContext ctx = getCluster().getServerContext(); @@ -72,4 +78,46 @@ public class ScanServerMetadataEntriesCleanIT extends SharedMiniClusterBase { ScanServerMetadataEntries.clean(ctx); assertFalse(ctx.getAmple().getScanServerFileReferences().findAny().isPresent()); } + + @Test + @SuppressWarnings("deprecation") + public void testOldScanServerRefs() { + HostAndPort server = HostAndPort.fromParts("127.0.0.1", 1234); + UUID serverLockUUID = UUID.randomUUID(); + + Set<ScanServerRefTabletFile> scanRefs = Stream.of("F0001270.rf", "F0001271.rf") + .map(f -> "hdfs://localhost:8020/accumulo/tables/2a/test_tablet/" + f) + .map(f -> new ScanServerRefTabletFile(serverLockUUID, server.toString(), f)) + .collect(Collectors.toSet()); + + ServerContext ctx = getCluster().getServerContext(); + ctx.getAmple().putScanServerFileReferences(scanRefs); + + assertEquals(scanRefs.size(), ctx.getAmple().getScanServerFileReferences().count()); + + // Add old scan server entries + try (BatchWriter writer = ctx.createBatchWriter(Ample.DataLevel.USER.metaTable())) { + String prefix = OldScanServerFileReferenceSection.getRowPrefix(); + for (String filepath : Stream.of("F0001243.rf", "F0006512.rf") + .map(f -> "hdfs://localhost:8020/accumulo/tables/2a/test_tablet/" + f) + .collect(Collectors.toSet())) { + Mutation m = new Mutation(prefix + filepath); + m.put(server.toString(), serverLockUUID.toString(), ""); + writer.addMutation(m); + } + writer.flush(); + } catch (MutationsRejectedException | TableNotFoundException e) { + throw new IllegalStateException( + "Error inserting scan server file references into " + Ample.DataLevel.USER.metaTable(), + e); + } + + // Ensure that ample returns all references from both ranges + assertEquals(scanRefs.size() + 2, ctx.getAmple().getScanServerFileReferences().count()); + + // Delete all references + ctx.getAmple().deleteScanServerFileReferences( + ctx.getAmple().getScanServerFileReferences().collect(Collectors.toSet())); + assertEquals(0, ctx.getAmple().getScanServerFileReferences().count()); + } } diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java index 7fbe060d44..e1c505ea72 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java @@ -50,6 +50,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.gc.Reference; import org.apache.accumulo.core.metadata.ScanServerRefTabletFile; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ScanServerFileReferenceSection; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.HostAndPort; @@ -111,7 +112,7 @@ public class ScanServerMetadataEntriesIT extends SharedMiniClusterBase { Set<ScanServerRefTabletFile> scanRefs = Stream.of("F0000070.rf", "F0000071.rf") .map(f -> "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/" + f) - .map(f -> new ScanServerRefTabletFile(f, server.toString(), serverLockUUID)) + .map(f -> new ScanServerRefTabletFile(serverLockUUID, server.toString(), f)) .collect(Collectors.toSet()); ServerContext ctx = getCluster().getServerContext(); @@ -242,8 +243,8 @@ public class ScanServerMetadataEntriesIT extends SharedMiniClusterBase { Set<String> metadataScanFileRefs = new HashSet<>(); metadataEntries.forEach(m -> { String row = m.getKey().getRow().toString(); - assertTrue(row.startsWith("~sserv")); - String file = row.substring(ScanServerFileReferenceSection.getRowPrefix().length()); + assertTrue(row.startsWith(MetadataSchema.ScanServerFileReferenceSection.getRowPrefix())); + String file = m.getKey().getColumnQualifier().toString(); metadataScanFileRefs.add(file); }); assertEquals(fileCount, metadataScanFileRefs.size());