This is an automated email from the ASF dual-hosted git repository.
ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new cf3792a945 Change scan-server-refs prefix and put UUID first (#4682)
cf3792a945 is described below
commit cf3792a945bde80bd00d6393e99f18b0a664a161
Author: Daniel Roberts <[email protected]>
AuthorDate: Fri Jun 21 17:03:59 2024 -0400
Change scan-server-refs prefix and put UUID first (#4682)
* Modify scan-server-refs to put UUID first
* Moves the scan server refs to a different prefix to avoid compatibility
* issues with possible downgrades
* Modifies the Scan Server Reference format to put the UUID
first in the row to reduce metadata tablet contention on writes
* Backported UuidUtil from elasticity
Backported the UuidUtil class from commit (e68f9dd)
---------
Co-authored-by: Keith Turner <[email protected]>
---
.../core/metadata/ScanServerRefTabletFile.java | 102 +++++++++++++++++----
.../core/metadata/schema/MetadataSchema.java | 14 +++
.../org/apache/accumulo/core/util/UuidUtil.java | 46 ++++++++++
.../accumulo/server/metadata/ServerAmpleImpl.java | 31 +++----
.../server/util/ScanServerMetadataEntries.java | 5 +-
.../org/apache/accumulo/tserver/ScanServer.java | 14 ++-
.../test/ScanServerMetadataEntriesCleanIT.java | 50 +++++++++-
.../accumulo/test/ScanServerMetadataEntriesIT.java | 7 +-
8 files changed, 218 insertions(+), 51 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefTabletFile.java
b/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefTabletFile.java
index ad169ca396..1d63fbf25d 100644
---
a/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefTabletFile.java
+++
b/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefTabletFile.java
@@ -22,38 +22,100 @@ import java.net.URI;
import java.util.Objects;
import java.util.UUID;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.OldScanServerFileReferenceSection;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.ScanServerFileReferenceSection;
+import org.apache.accumulo.core.util.UuidUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import com.google.common.base.Preconditions;
+
public class ScanServerRefTabletFile extends TabletFile {
+ @SuppressWarnings("deprecation")
+ private static final String OLD_PREFIX =
OldScanServerFileReferenceSection.getRowPrefix();
+ private final String prefix;
private final Value NULL_VALUE = new Value(new byte[0]);
- private final Text colf;
- private final Text colq;
+ private final Text serverAddress;
+ private final Text uuid;
- public ScanServerRefTabletFile(String file, String serverAddress, UUID
serverLockUUID) {
+ public ScanServerRefTabletFile(UUID serverLockUUID, String serverAddress,
String file) {
super(new Path(URI.create(file)));
- this.colf = new Text(serverAddress);
- this.colq = new Text(serverLockUUID.toString());
+ // For new data, always use the current prefix
+ prefix = ScanServerFileReferenceSection.getRowPrefix();
+ this.serverAddress = new Text(serverAddress);
+ uuid = new Text(serverLockUUID.toString());
}
- public ScanServerRefTabletFile(String file, Text colf, Text colq) {
- super(new Path(URI.create(file)));
- this.colf = colf;
- this.colq = colq;
+ public ScanServerRefTabletFile(Key k) {
+ super(new Path(URI.create(extractFile(k))));
+ serverAddress = k.getColumnFamily();
+ if (isOldPrefix(k)) {
+ prefix = OLD_PREFIX;
+ uuid = new Text(k.getColumnQualifier().toString());
+ } else {
+ prefix = ScanServerFileReferenceSection.getRowPrefix();
+ var row = k.getRow().toString();
+ Preconditions.checkArgument(row.startsWith(prefix), "Unexpected row
prefix %s ", row);
+ var uuidStr = row.substring(prefix.length());
+ Preconditions.checkArgument(UuidUtil.isUUID(uuidStr, 0), "Row suffix is
not uuid %s", row);
+ uuid = new Text(uuidStr);
+ }
+ }
+
+ public Mutation putMutation() {
+ // Only write scan refs in the new format
+ Mutation mutation = new Mutation(prefix + uuid.toString());
+ mutation.put(serverAddress, getFilePath(), getValue());
+ return mutation;
+ }
+
+ public Mutation putDeleteMutation() {
+ Mutation mutation;
+ if (Objects.equals(prefix, OLD_PREFIX)) {
+ mutation = new Mutation(prefix + this.getPath().toString());
+ mutation.putDelete(serverAddress, uuid);
+ } else {
+ mutation = new Mutation(prefix + uuid.toString());
+ mutation.putDelete(serverAddress, getFilePath());
+ }
+ return mutation;
+ }
+
+ private static String extractFile(Key k) {
+ if (isOldPrefix(k)) {
+ return k.getRow().toString().substring(OLD_PREFIX.length());
+ } else {
+ return k.getColumnQualifier().toString();
+ }
+ }
+
+ /**
+ * Returns the correctly formatted range for a unique uuid
+ *
+ * @param uuid ServerLockUUID of a Scan Server
+ * @return Range for a single scan server
+ */
+ public static Range getRange(UUID uuid) {
+ Objects.requireNonNull(uuid);
+ return new
Range(MetadataSchema.ScanServerFileReferenceSection.getRowPrefix() + uuid);
}
- public String getRowSuffix() {
- return this.getPathStr();
+ private static boolean isOldPrefix(Key k) {
+ return k.getRow().toString().startsWith(OLD_PREFIX);
}
- public Text getServerAddress() {
- return this.colf;
+ public UUID getServerLockUUID() {
+ return UUID.fromString(uuid.toString());
}
- public Text getServerLockUUID() {
- return this.colq;
+ public Text getFilePath() {
+ return new Text(this.getPath().toString());
}
public Value getValue() {
@@ -64,8 +126,8 @@ public class ScanServerRefTabletFile extends TabletFile {
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
- result = prime * result + ((colf == null) ? 0 : colf.hashCode());
- result = prime * result + ((colq == null) ? 0 : colq.hashCode());
+ result = prime * result + ((serverAddress == null) ? 0 :
serverAddress.hashCode());
+ result = prime * result + ((uuid == null) ? 0 : uuid.hashCode());
return result;
}
@@ -81,13 +143,13 @@ public class ScanServerRefTabletFile extends TabletFile {
return false;
}
ScanServerRefTabletFile other = (ScanServerRefTabletFile) obj;
- return Objects.equals(colf, other.colf) && Objects.equals(colq,
other.colq);
+ return Objects.equals(serverAddress, other.serverAddress) &&
Objects.equals(uuid, other.uuid);
}
@Override
public String toString() {
- return "ScanServerRefTabletFile [file=" + this.getRowSuffix() + ", server
address=" + colf
- + ", server lock uuid=" + colq + "]";
+ return "ScanServerRefTabletFile [file=" + this.getPath().toString() + ",
server address="
+ + serverAddress + ", server lock uuid=" + uuid + "]";
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index b1b306b083..5bb4e3a8ca 100644
---
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -488,6 +488,20 @@ public class MetadataSchema {
}
public static class ScanServerFileReferenceSection {
+ private static final Section section =
+ new Section(RESERVED_PREFIX + "scanfileref", true, RESERVED_PREFIX +
"scanfilereg", false);
+
+ public static Range getRange() {
+ return section.getRange();
+ }
+
+ public static String getRowPrefix() {
+ return section.getRowPrefix();
+ }
+ }
+
+ @Deprecated(since = "2.1")
+ public static class OldScanServerFileReferenceSection {
private static final Section section =
new Section(RESERVED_PREFIX + "sserv", true, RESERVED_PREFIX +
"sserx", false);
diff --git a/core/src/main/java/org/apache/accumulo/core/util/UuidUtil.java
b/core/src/main/java/org/apache/accumulo/core/util/UuidUtil.java
new file mode 100644
index 0000000000..83d41abe0f
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/UuidUtil.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.util;
+
+public class UuidUtil {
+ /**
+ * A fast method for verifying a suffix of a string looks like a uuid.
+ *
+ * @param offset location where the uuid starts. Its expected the uuid
occupies the rest of the
+ * string.
+ */
+ public static boolean isUUID(String uuid, int offset) {
+ if (uuid.length() - offset != 36) {
+ return false;
+ }
+ for (int i = 0; i < 36; i++) {
+ var c = uuid.charAt(i + offset);
+ if (i == 8 || i == 13 || i == 18 || i == 23) {
+ if (c != '-') {
+ // expect '-' char at above positions, did not see it
+ return false;
+ }
+ } else if (c < '0' || (c > '9' && c < 'A') || (c > 'F' && c < 'a') || c
> 'f') {
+ // expected hex at all other positions, did not see hex chars
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index ef02f22b4d..ed531167ca 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -33,6 +33,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
+import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -60,6 +61,7 @@ import
org.apache.accumulo.core.metadata.schema.MetadataSchema.BlipSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection.SkewedKeyValue;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.ExternalCompactionSection;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.OldScanServerFileReferenceSection;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.ScanServerFileReferenceSection;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
@@ -345,11 +347,8 @@ public class ServerAmpleImpl extends AmpleImpl implements
Ample {
@Override
public void putScanServerFileReferences(Collection<ScanServerRefTabletFile>
scanRefs) {
try (BatchWriter writer =
context.createBatchWriter(DataLevel.USER.metaTable())) {
- String prefix = ScanServerFileReferenceSection.getRowPrefix();
for (ScanServerRefTabletFile ref : scanRefs) {
- Mutation m = new Mutation(prefix + ref.getRowSuffix());
- m.put(ref.getServerAddress(), ref.getServerLockUUID(), ref.getValue());
- writer.addMutation(m);
+ writer.addMutation(ref.putMutation());
}
} catch (MutationsRejectedException | TableNotFoundException e) {
throw new IllegalStateException(
@@ -358,14 +357,15 @@ public class ServerAmpleImpl extends AmpleImpl implements
Ample {
}
@Override
+ @SuppressWarnings("deprecation")
public Stream<ScanServerRefTabletFile> getScanServerFileReferences() {
try {
- Scanner scanner = context.createScanner(DataLevel.USER.metaTable(),
Authorizations.EMPTY);
- scanner.setRange(ScanServerFileReferenceSection.getRange());
- int pLen = ScanServerFileReferenceSection.getRowPrefix().length();
+ BatchScanner scanner =
+ context.createBatchScanner(DataLevel.USER.metaTable(),
Authorizations.EMPTY);
+ scanner.setRanges(Set.of(ScanServerFileReferenceSection.getRange(),
+ OldScanServerFileReferenceSection.getRange()));
return StreamSupport.stream(scanner.spliterator(), false)
- .map(e -> new
ScanServerRefTabletFile(e.getKey().getRowData().toString().substring(pLen),
- e.getKey().getColumnFamily(), e.getKey().getColumnQualifier()));
+ .map(e -> new ScanServerRefTabletFile(e.getKey()));
} catch (TableNotFoundException e) {
throw new IllegalStateException(DataLevel.USER.metaTable() + " not
found!", e);
}
@@ -377,14 +377,10 @@ public class ServerAmpleImpl extends AmpleImpl implements
Ample {
Objects.requireNonNull(scanServerLockUUID, "Server uuid must be supplied");
try (
Scanner scanner = context.createScanner(DataLevel.USER.metaTable(),
Authorizations.EMPTY)) {
- scanner.setRange(ScanServerFileReferenceSection.getRange());
- scanner.fetchColumn(new Text(serverAddress), new
Text(scanServerLockUUID.toString()));
+ scanner.setRange(ScanServerRefTabletFile.getRange(scanServerLockUUID));
- int pLen = ScanServerFileReferenceSection.getRowPrefix().length();
Set<ScanServerRefTabletFile> refsToDelete =
StreamSupport.stream(scanner.spliterator(), false)
- .map(e -> new
ScanServerRefTabletFile(e.getKey().getRowData().toString().substring(pLen),
- e.getKey().getColumnFamily(), e.getKey().getColumnQualifier()))
- .collect(Collectors.toSet());
+ .map(e -> new
ScanServerRefTabletFile(e.getKey())).collect(Collectors.toSet());
if (!refsToDelete.isEmpty()) {
this.deleteScanServerFileReferences(refsToDelete);
@@ -397,11 +393,8 @@ public class ServerAmpleImpl extends AmpleImpl implements
Ample {
@Override
public void
deleteScanServerFileReferences(Collection<ScanServerRefTabletFile>
refsToDelete) {
try (BatchWriter writer =
context.createBatchWriter(DataLevel.USER.metaTable())) {
- String prefix = ScanServerFileReferenceSection.getRowPrefix();
for (ScanServerRefTabletFile ref : refsToDelete) {
- Mutation m = new Mutation(prefix + ref.getRowSuffix());
- m.putDelete(ref.getServerAddress(), ref.getServerLockUUID());
- writer.addMutation(m);
+ writer.addMutation(ref.putDeleteMutation());
}
log.debug("Deleted scan server file reference entries for files: {}",
refsToDelete);
} catch (MutationsRejectedException | TableNotFoundException e) {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/ScanServerMetadataEntries.java
b/server/base/src/main/java/org/apache/accumulo/server/util/ScanServerMetadataEntries.java
index 61e3244c9a..d9beb30f50 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/ScanServerMetadataEntries.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/ScanServerMetadataEntries.java
@@ -40,7 +40,7 @@ public class ScanServerMetadataEntries {
// collect all uuids that are currently in the metadata table
context.getAmple().getScanServerFileReferences().forEach(ssrtf -> {
- uuidsToDelete.add(UUID.fromString(ssrtf.getServerLockUUID().toString()));
+ uuidsToDelete.add(ssrtf.getServerLockUUID());
});
// gather the list of current live scan servers, its important that this
is done after the above
@@ -56,8 +56,7 @@ public class ScanServerMetadataEntries {
final Set<ScanServerRefTabletFile> refsToDelete = new HashSet<>();
context.getAmple().getScanServerFileReferences().forEach(ssrtf -> {
-
- var uuid = UUID.fromString(ssrtf.getServerLockUUID().toString());
+ var uuid = ssrtf.getServerLockUUID();
if (uuidsToDelete.contains(uuid)) {
refsToDelete.add(ssrtf);
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 2216b55489..d3c61fdd0d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -425,9 +425,13 @@ public class ScanServer extends AbstractServer
LOG.info("Stopping Thrift Servers");
address.server.stop();
- LOG.info("Removing server scan references");
-
this.getContext().getAmple().deleteScanServerFileReferences(clientAddress.toString(),
- serverLockUUID);
+ try {
+ LOG.info("Removing server scan references");
+
this.getContext().getAmple().deleteScanServerFileReferences(clientAddress.toString(),
+ serverLockUUID);
+ } catch (Exception e) {
+ LOG.warn("Failed to remove scan server refs from metadata location",
e);
+ }
try {
LOG.debug("Closing filesystems");
@@ -625,7 +629,7 @@ public class ScanServer extends AbstractServer
for (StoredTabletFile file : allFiles.keySet()) {
if (!reservedFiles.containsKey(file)) {
- refs.add(new ScanServerRefTabletFile(file.getPathStr(),
serverAddress, serverLockUUID));
+ refs.add(new ScanServerRefTabletFile(serverLockUUID, serverAddress,
file.getPathStr()));
filesToReserve.add(file);
tabletsToCheck.add(Objects.requireNonNull(allFiles.get(file)));
LOG.trace("RFFS {} need to add scan ref for file {}",
myReservationId, file);
@@ -829,7 +833,7 @@ public class ScanServer extends AbstractServer
influxFiles.add(file);
confirmed.add(file);
refsToDelete
- .add(new ScanServerRefTabletFile(file.getPathStr(),
serverAddress, serverLockUUID));
+ .add(new ScanServerRefTabletFile(serverLockUUID,
serverAddress, file.getPathStr()));
// remove the entry from the map while holding the write lock
ensuring no new
// reservations are added to the map values while the metadata
operation to delete is
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesCleanIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesCleanIT.java
index 6cd6ba7f03..da7e9228da 100644
---
a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesCleanIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesCleanIT.java
@@ -27,7 +27,13 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.OldScanServerFileReferenceSection;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.server.ServerContext;
@@ -57,7 +63,7 @@ public class ScanServerMetadataEntriesCleanIT extends
SharedMiniClusterBase {
Set<ScanServerRefTabletFile> scanRefs = Stream.of("F0000070.rf",
"F0000071.rf")
.map(f -> "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/" +
f)
- .map(f -> new ScanServerRefTabletFile(f, server.toString(),
serverLockUUID))
+ .map(f -> new ScanServerRefTabletFile(serverLockUUID,
server.toString(), f))
.collect(Collectors.toSet());
ServerContext ctx = getCluster().getServerContext();
@@ -72,4 +78,46 @@ public class ScanServerMetadataEntriesCleanIT extends
SharedMiniClusterBase {
ScanServerMetadataEntries.clean(ctx);
assertFalse(ctx.getAmple().getScanServerFileReferences().findAny().isPresent());
}
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testOldScanServerRefs() {
+ HostAndPort server = HostAndPort.fromParts("127.0.0.1", 1234);
+ UUID serverLockUUID = UUID.randomUUID();
+
+ Set<ScanServerRefTabletFile> scanRefs = Stream.of("F0001270.rf",
"F0001271.rf")
+ .map(f -> "hdfs://localhost:8020/accumulo/tables/2a/test_tablet/" + f)
+ .map(f -> new ScanServerRefTabletFile(serverLockUUID,
server.toString(), f))
+ .collect(Collectors.toSet());
+
+ ServerContext ctx = getCluster().getServerContext();
+ ctx.getAmple().putScanServerFileReferences(scanRefs);
+
+ assertEquals(scanRefs.size(),
ctx.getAmple().getScanServerFileReferences().count());
+
+ // Add old scan server entries
+ try (BatchWriter writer =
ctx.createBatchWriter(Ample.DataLevel.USER.metaTable())) {
+ String prefix = OldScanServerFileReferenceSection.getRowPrefix();
+ for (String filepath : Stream.of("F0001243.rf", "F0006512.rf")
+ .map(f -> "hdfs://localhost:8020/accumulo/tables/2a/test_tablet/" +
f)
+ .collect(Collectors.toSet())) {
+ Mutation m = new Mutation(prefix + filepath);
+ m.put(server.toString(), serverLockUUID.toString(), "");
+ writer.addMutation(m);
+ }
+ writer.flush();
+ } catch (MutationsRejectedException | TableNotFoundException e) {
+ throw new IllegalStateException(
+ "Error inserting scan server file references into " +
Ample.DataLevel.USER.metaTable(),
+ e);
+ }
+
+ // Ensure that ample returns all references from both ranges
+ assertEquals(scanRefs.size() + 2,
ctx.getAmple().getScanServerFileReferences().count());
+
+ // Delete all references
+ ctx.getAmple().deleteScanServerFileReferences(
+
ctx.getAmple().getScanServerFileReferences().collect(Collectors.toSet()));
+ assertEquals(0, ctx.getAmple().getScanServerFileReferences().count());
+ }
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
index 7fbe060d44..e1c505ea72 100644
---
a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
@@ -50,6 +50,7 @@ import
org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.gc.Reference;
import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.ScanServerFileReferenceSection;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.HostAndPort;
@@ -111,7 +112,7 @@ public class ScanServerMetadataEntriesIT extends
SharedMiniClusterBase {
Set<ScanServerRefTabletFile> scanRefs = Stream.of("F0000070.rf",
"F0000071.rf")
.map(f -> "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/" +
f)
- .map(f -> new ScanServerRefTabletFile(f, server.toString(),
serverLockUUID))
+ .map(f -> new ScanServerRefTabletFile(serverLockUUID,
server.toString(), f))
.collect(Collectors.toSet());
ServerContext ctx = getCluster().getServerContext();
@@ -242,8 +243,8 @@ public class ScanServerMetadataEntriesIT extends
SharedMiniClusterBase {
Set<String> metadataScanFileRefs = new HashSet<>();
metadataEntries.forEach(m -> {
String row = m.getKey().getRow().toString();
- assertTrue(row.startsWith("~sserv"));
- String file =
row.substring(ScanServerFileReferenceSection.getRowPrefix().length());
+
assertTrue(row.startsWith(MetadataSchema.ScanServerFileReferenceSection.getRowPrefix()));
+ String file = m.getKey().getColumnQualifier().toString();
metadataScanFileRefs.add(file);
});
assertEquals(fileCount, metadataScanFileRefs.size());