This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 7a27d40463 Offers new ways of computing bulk load plans (#4898) 7a27d40463 is described below commit 7a27d4046323203926122f447009e36c1ae88c75 Author: Keith Turner <ktur...@apache.org> AuthorDate: Sat Feb 1 15:51:59 2025 -0500 Offers new ways of computing bulk load plans (#4898) Two new ways of computing bulk import load plans are offered in these change. First the RFile API was modified to support computing a LoadPlan as the RFile is written. Second a new LoadPlan.compute() method was added that creates a LoadPlan from an existing RFile. In addition to these changes methods were added to LoadPlan that support serializing and deserializing load plans to/from json. All of these changes together support the use case of computing load plans in a distributed manner. For example, with a bulk import directory with N files the following use case is now supported. 1. For eack file a task is spun up on a remote server that calls the new LoadPlan.compute() API to determine what tablets the file overlaps. Then the new LoadPlan.toJson() method is called to serialize the load plan and send it to a central place. 2. All the load plans from the remote servers are deserialized calling the new LoadPlan.fromJson() method and merged into a single load plan that is used to do the bulk import. Another use case these new APIs could support is running this new code in the map reduce job that generates bulk import data. 1. In each reducer as it writes to an rfile it could also be building a LoadPlan. A load plan can be obtained from the Rfile after closing it and serialized using LoadPlan.toJson() and the result saved to a file. So after the map reduce job completes each rfile would have corresponding file with a load plan for that file. 2. Another process that runs after the map reduce job can load all the load plans from files and merge them using the new LoadPlan.fromJson() method. Then the merged LoadPlan can be used to do the bulk import. Both of these use cases avoid doing the analysis of files on a single machine doing the bulk import. Bulk import V1 had this functionality and would ask random tservers to do the file analysis. This could cause unexpected load on those tservers. Bulk V1 would interleave analyzing files and adding them to tablets. This could lead to odd situations where files are partially imported to some tablets and analysis fails, leaving the file partially imported. Bulk v2 does all analysis b [...] Co-authored-by: Daniel Roberts <ddani...@gmail.com> Co-authored-by: Christopher Tubbs <ctubb...@apache.org> --- .../java/org/apache/accumulo/core/Constants.java | 1 + .../core/client/rfile/LoadPlanCollector.java | 131 +++++++++++ .../apache/accumulo/core/client/rfile/RFile.java | 10 + .../accumulo/core/client/rfile/RFileWriter.java | 36 ++- .../core/client/rfile/RFileWriterBuilder.java | 35 ++- .../accumulo/core/clientImpl/bulk/BulkImport.java | 39 +++- .../org/apache/accumulo/core/data/LoadPlan.java | 242 ++++++++++++++++++++- .../apache/accumulo/core/file/FileOperations.java | 5 + .../core/client/rfile/RFileClientTest.java | 210 ++++++++++++++++++ .../apache/accumulo/core/data/LoadPlanTest.java | 48 ++++ .../apache/accumulo/core/file/rfile/RFileTest.java | 1 - .../apache/accumulo/test/functional/BulkNewIT.java | 61 +++++- 12 files changed, 792 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index c31e205585..1caf157e90 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -104,6 +104,7 @@ public class Constants { public static final String BULK_PREFIX = "b-"; public static final String BULK_RENAME_FILE = "renames.json"; public static final String BULK_LOAD_MAPPING = "loadmap.json"; + public static final String BULK_WORKING_PREFIX = "accumulo-bulk-"; public static final String CLONE_PREFIX = "c-"; public static final byte[] CLONE_PREFIX_BYTES = CLONE_PREFIX.getBytes(UTF_8); diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java new file mode 100644 index 0000000000..511e3fed51 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.client.rfile; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Preconditions; + +class LoadPlanCollector { + + private final LoadPlan.SplitResolver splitResolver; + private boolean finished = false; + private Text lgFirstRow; + private Text lgLastRow; + private Text firstRow; + private Text lastRow; + private Set<KeyExtent> overlappingExtents; + private KeyExtent currentExtent; + private long appended = 0; + + LoadPlanCollector(LoadPlan.SplitResolver splitResolver) { + this.splitResolver = splitResolver; + this.overlappingExtents = new HashSet<>(); + } + + LoadPlanCollector() { + splitResolver = null; + this.overlappingExtents = null; + + } + + private void appendNoSplits(Key key) { + if (lgFirstRow == null) { + lgFirstRow = key.getRow(); + lgLastRow = lgFirstRow; + } else { + var row = key.getRow(); + lgLastRow = row; + } + } + + private static final TableId FAKE_ID = TableId.of("123"); + + private void appendSplits(Key key) { + var row = key.getRow(); + if (currentExtent == null || !currentExtent.contains(row)) { + var tableSplits = splitResolver.apply(row); + var extent = new KeyExtent(FAKE_ID, tableSplits.getEndRow(), tableSplits.getPrevRow()); + Preconditions.checkState(extent.contains(row), "%s does not contain %s", tableSplits, row); + if (currentExtent != null) { + overlappingExtents.add(currentExtent); + } + currentExtent = extent; + } + } + + public void append(Key key) { + if (splitResolver == null) { + appendNoSplits(key); + } else { + appendSplits(key); + } + appended++; + } + + public void startLocalityGroup() { + if (lgFirstRow != null) { + if (firstRow == null) { + firstRow = lgFirstRow; + lastRow = lgLastRow; + } else { + // take the minimum + firstRow = firstRow.compareTo(lgFirstRow) < 0 ? firstRow : lgFirstRow; + // take the maximum + lastRow = lastRow.compareTo(lgLastRow) > 0 ? lastRow : lgLastRow; + } + lgFirstRow = null; + lgLastRow = null; + } + } + + public LoadPlan getLoadPlan(String filename) { + Preconditions.checkState(finished, "Attempted to get load plan before closing"); + + if (appended == 0) { + return LoadPlan.builder().build(); + } + + if (splitResolver == null) { + return LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, firstRow, lastRow) + .build(); + } else { + var builder = LoadPlan.builder(); + overlappingExtents.add(currentExtent); + for (var extent : overlappingExtents) { + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, extent.prevEndRow(), + extent.endRow()); + } + return builder.build(); + } + } + + public void close() { + finished = true; + // compute the overall min and max rows + startLocalityGroup(); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java index 3b6d10aade..71c186f3eb 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java @@ -34,6 +34,7 @@ import org.apache.accumulo.core.client.summary.Summary; import org.apache.accumulo.core.client.summary.Summary.FileStatistics; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.security.Authorizations; import org.apache.hadoop.fs.FileSystem; @@ -428,6 +429,15 @@ public class RFile { */ WriterOptions withVisibilityCacheSize(int maxSize); + /** + * @param splitResolver builds a {@link LoadPlan} using table split points provided by the given + * splitResolver. + * @return this + * @see RFileWriter#getLoadPlan(String) + * @since 2.1.4 + */ + WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver); + /** * @return a new RfileWriter created with the options previously specified. */ diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java index b4d6def4a2..ffa43e3bc4 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java @@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.security.ColumnVisibility; @@ -92,12 +93,15 @@ public class RFileWriter implements AutoCloseable { private final FileSKVWriter writer; private final LRUMap<ByteSequence,Boolean> validVisibilities; + + private final LoadPlanCollector loadPlanCollector; private boolean startedLG; private boolean startedDefaultLG; - RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize) { + RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize, LoadPlanCollector loadPlanCollector) { this.writer = fileSKVWriter; this.validVisibilities = new LRUMap<>(visCacheSize); + this.loadPlanCollector = loadPlanCollector; } private void _startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) @@ -106,6 +110,7 @@ public class RFileWriter implements AutoCloseable { "Cannot start a locality group after starting the default locality group"); writer.startNewLocalityGroup(name, columnFamilies); startedLG = true; + loadPlanCollector.startLocalityGroup(); } /** @@ -175,6 +180,7 @@ public class RFileWriter implements AutoCloseable { public void startDefaultLocalityGroup() throws IOException { Preconditions.checkState(!startedDefaultLG); + loadPlanCollector.startLocalityGroup(); writer.startDefaultLocalityGroup(); startedDefaultLG = true; startedLG = true; @@ -204,6 +210,7 @@ public class RFileWriter implements AutoCloseable { validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE); } writer.append(key, val); + loadPlanCollector.append(key); } /** @@ -249,6 +256,31 @@ public class RFileWriter implements AutoCloseable { @Override public void close() throws IOException { - writer.close(); + try { + writer.close(); + } finally { + loadPlanCollector.close(); + } + } + + /** + * If no split resolver was provided when the RFileWriter was built then this method will return a + * simple load plan of type {@link org.apache.accumulo.core.data.LoadPlan.RangeType#FILE} using + * the first and last row seen. If a splitResolver was provided then this will return a load plan + * of type {@link org.apache.accumulo.core.data.LoadPlan.RangeType#TABLE} that has the split + * ranges the rows written overlapped. + * + * @param filename This file name will be used in the load plan and it should match the name that + * will be used when bulk importing this file. Only a filename is needed, not a full path. + * @return load plan computed from the keys written to the rfile. + * @see org.apache.accumulo.core.client.rfile.RFile.WriterOptions#withSplitResolver(LoadPlan.SplitResolver) + * @since 2.1.4 + * @throws IllegalStateException is attempting to get load plan before calling {@link #close()} + * @throws IllegalArgumentException is a full path is passed instead of a filename + */ + public LoadPlan getLoadPlan(String filename) { + Preconditions.checkArgument(!filename.contains("/"), + "Unexpected path %s seen instead of file name", filename); + return loadPlanCollector.getLoadPlan(filename); } } diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java index 6382d568b5..5b12f4d73b 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.client.rfile; import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; import java.io.IOException; import java.io.OutputStream; @@ -26,7 +27,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; import java.util.stream.Stream; import org.apache.accumulo.core.client.rfile.RFile.WriterFSOptions; @@ -37,6 +37,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.crypto.CryptoFactoryLoader; +import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.metadata.ValidationUtil; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; @@ -72,6 +73,7 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions private int visCacheSize = 1000; private Map<String,String> samplerProps = Collections.emptyMap(); private Map<String,String> summarizerProps = Collections.emptyMap(); + private LoadPlan.SplitResolver splitResolver; private void checkDisjoint(Map<String,String> props, Map<String,String> derivedProps, String kind) { @@ -81,7 +83,7 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions @Override public WriterOptions withSampler(SamplerConfiguration samplerConf) { - Objects.requireNonNull(samplerConf); + requireNonNull(samplerConf); Map<String,String> tmp = new SamplerConfigurationImpl(samplerConf).toTablePropertiesMap(); checkDisjoint(tableConfig, tmp, "sampler"); this.samplerProps = tmp; @@ -106,6 +108,9 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions CryptoService cs = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, tableConfig); + var loadPlanCollector = + splitResolver == null ? new LoadPlanCollector() : new LoadPlanCollector(splitResolver); + if (out.getOutputStream() != null) { FSDataOutputStream fsdo; if (out.getOutputStream() instanceof FSDataOutputStream) { @@ -116,17 +121,19 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions return new RFileWriter( fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf(), cs) .withTableConfiguration(acuconf).withStartDisabled().build(), - visCacheSize); + visCacheSize, loadPlanCollector); } else { - return new RFileWriter(fileops.newWriterBuilder() - .forFile(out.path.toString(), out.getFileSystem(out.path), out.getConf(), cs) - .withTableConfiguration(acuconf).withStartDisabled().build(), visCacheSize); + return new RFileWriter( + fileops.newWriterBuilder() + .forFile(out.path.toString(), out.getFileSystem(out.path), out.getConf(), cs) + .withTableConfiguration(acuconf).withStartDisabled().build(), + visCacheSize, loadPlanCollector); } } @Override public WriterOptions withFileSystem(FileSystem fs) { - Objects.requireNonNull(fs); + requireNonNull(fs); out.fs = fs; return this; } @@ -140,14 +147,14 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions @Override public WriterOptions to(OutputStream out) { - Objects.requireNonNull(out); + requireNonNull(out); this.out = new OutputArgs(out); return this; } @Override public WriterOptions withTableProperties(Iterable<Entry<String,String>> tableConfig) { - Objects.requireNonNull(tableConfig); + requireNonNull(tableConfig); HashMap<String,String> cfg = new HashMap<>(); for (Entry<String,String> entry : tableConfig) { cfg.put(entry.getKey(), entry.getValue()); @@ -161,7 +168,7 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions @Override public WriterOptions withTableProperties(Map<String,String> tableConfig) { - Objects.requireNonNull(tableConfig); + requireNonNull(tableConfig); return withTableProperties(tableConfig.entrySet()); } @@ -172,9 +179,15 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions return this; } + @Override + public WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver) { + this.splitResolver = requireNonNull(splitResolver); + return this; + } + @Override public WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf) { - Objects.requireNonNull(summarizerConf); + requireNonNull(summarizerConf); Map<String,String> tmp = SummarizerConfiguration.toTableProperties(summarizerConf); checkDisjoint(tableConfig, tmp, "summarizer"); this.summarizerProps = tmp; diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java index a85db74a86..1c155cd510 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java @@ -48,6 +48,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.function.Function; import java.util.stream.Stream; import org.apache.accumulo.core.Constants; @@ -324,19 +325,25 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti KeyExtent lookup(Text row); } - public static List<KeyExtent> findOverlappingTablets(KeyExtentCache extentCache, - FileSKVIterator reader) throws IOException { + /** + * Function that will find a row in a file being bulk imported that is >= the row passed to the + * function. If there is no row then it should return null. + */ + public interface NextRowFunction { + Text apply(Text row) throws IOException; + } + + public static List<KeyExtent> findOverlappingTablets(Function<Text,KeyExtent> rowToExtentResolver, + NextRowFunction nextRowFunction) throws IOException { List<KeyExtent> result = new ArrayList<>(); - Collection<ByteSequence> columnFamilies = Collections.emptyList(); Text row = new Text(); while (true) { - reader.seek(new Range(row, null), columnFamilies, false); - if (!reader.hasTop()) { + row = nextRowFunction.apply(row); + if (row == null) { break; } - row = reader.getTopKey().getRow(); - KeyExtent extent = extentCache.lookup(row); + KeyExtent extent = rowToExtentResolver.apply(row); result.add(extent); row = extent.endRow(); if (row != null) { @@ -356,13 +363,23 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti } public static List<KeyExtent> findOverlappingTablets(ClientContext context, - KeyExtentCache extentCache, Path file, FileSystem fs, Cache<String,Long> fileLenCache, + KeyExtentCache keyExtentCache, Path file, FileSystem fs, Cache<String,Long> fileLenCache, CryptoService cs) throws IOException { try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() .forFile(file.toString(), fs, fs.getConf(), cs) .withTableConfiguration(context.getConfiguration()).withFileLenCache(fileLenCache) .seekToBeginning().build()) { - return findOverlappingTablets(extentCache, reader); + + Collection<ByteSequence> columnFamilies = Collections.emptyList(); + NextRowFunction nextRowFunction = row -> { + reader.seek(new Range(row, null), columnFamilies, false); + if (!reader.hasTop()) { + return null; + } + return reader.getTopKey().getRow(); + }; + + return findOverlappingTablets(keyExtentCache::lookup, nextRowFunction); } } @@ -517,8 +534,8 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti continue; } - if (FileOperations.getBulkWorkingFiles().contains(fname)) { - log.debug("{} is an internal working file, ignoring.", fileStatus.getPath()); + if (FileOperations.isBulkWorkingFile(fname)) { + log.trace("{} is an internal working file, ignoring.", fileStatus.getPath()); continue; } diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java index aecd25f663..c06fbb83c2 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java +++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java @@ -20,17 +20,31 @@ package org.apache.accumulo.core.data; import static java.nio.charset.StandardCharsets.UTF_8; +import java.io.IOException; +import java.net.URI; import java.nio.file.Paths; import java.util.Arrays; +import java.util.Base64; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.SortedSet; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions; +import org.apache.accumulo.core.client.rfile.RFile; +import org.apache.accumulo.core.clientImpl.bulk.BulkImport; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.primitives.UnsignedBytes; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -125,7 +139,7 @@ public class LoadPlan { "Start row is greater than or equal to end row : " + srs + " " + ers); } } else { - throw new RuntimeException(); + throw new IllegalStateException(); } } @@ -228,4 +242,230 @@ public class LoadPlan { } }; } + + private static class JsonDestination { + String fileName; + String startRow; + String endRow; + RangeType rangeType; + + JsonDestination() {} + + JsonDestination(Destination destination) { + fileName = destination.getFileName(); + startRow = destination.getStartRow() == null ? null + : Base64.getUrlEncoder().encodeToString(destination.getStartRow()); + endRow = destination.getEndRow() == null ? null + : Base64.getUrlEncoder().encodeToString(destination.getEndRow()); + rangeType = destination.getRangeType(); + } + + Destination toDestination() { + return new Destination(fileName, rangeType, + startRow == null ? null : Base64.getUrlDecoder().decode(startRow), + endRow == null ? null : Base64.getUrlDecoder().decode(endRow)); + } + } + + private static final class JsonAll { + List<JsonDestination> destinations; + + JsonAll() {} + + JsonAll(List<Destination> destinations) { + this.destinations = + destinations.stream().map(JsonDestination::new).collect(Collectors.toList()); + } + + } + + private static final Gson gson = new GsonBuilder().disableJdkUnsafe().serializeNulls().create(); + + /** + * Serializes the load plan to json that looks like the following. The values of startRow and + * endRow field are base64 encoded using {@link Base64#getUrlEncoder()}. + * + * <pre> + * { + * "destinations": [ + * { + * "fileName": "f1.rf", + * "startRow": null, + * "endRow": "MDAz", + * "rangeType": "TABLE" + * }, + * { + * "fileName": "f2.rf", + * "startRow": "MDA0", + * "endRow": "MDA3", + * "rangeType": "FILE" + * }, + * { + * "fileName": "f1.rf", + * "startRow": "MDA1", + * "endRow": "MDA2", + * "rangeType": "TABLE" + * }, + * { + * "fileName": "f3.rf", + * "startRow": "MDA4", + * "endRow": null, + * "rangeType": "TABLE" + * } + * ] + * } + * </pre> + * + * @since 2.1.4 + */ + public String toJson() { + return gson.toJson(new JsonAll(destinations)); + } + + /** + * Deserializes json to a load plan. + * + * @param json produced by {@link #toJson()} + */ + public static LoadPlan fromJson(String json) { + var dests = gson.fromJson(json, JsonAll.class).destinations.stream() + .map(JsonDestination::toDestination).collect(Collectors.toUnmodifiableList()); + return new LoadPlan(dests); + } + + /** + * Represents two split points that exist in a table being bulk imported to. + * + * @since 2.1.4 + */ + public static class TableSplits { + private final Text prevRow; + private final Text endRow; + + public TableSplits(Text prevRow, Text endRow) { + Preconditions.checkArgument( + prevRow == null || endRow == null || prevRow.compareTo(endRow) < 0, "%s >= %s", prevRow, + endRow); + this.prevRow = prevRow; + this.endRow = endRow; + } + + public Text getPrevRow() { + return prevRow; + } + + public Text getEndRow() { + return endRow; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TableSplits that = (TableSplits) o; + return Objects.equals(prevRow, that.prevRow) && Objects.equals(endRow, that.endRow); + } + + @Override + public int hashCode() { + return Objects.hash(prevRow, endRow); + } + + @Override + public String toString() { + return "(" + prevRow + "," + endRow + "]"; + } + } + + /** + * A function that maps a row to two table split points that contain the row. These splits must + * exist in the table being bulk imported to. There is no requirement that the splits are + * contiguous. For example if a table has splits C,D,E,M and we ask for splits containing row H + * its ok to return D,M, but that could result in the file mapping to more actual tablets than + * needed. For a row that falls before or after all table splits, use null to represent -inf and + * +inf. For example if a table has splits C,D,E,M and row B is resolved it is ok to return + * null,C. If row Q is resolved for table splits C,D,E,M it would be ok to return M,null. For a + * table with zero splits, the resolver should return null,null for all rows. + * + * @since 2.1.4 + */ + public interface SplitResolver extends Function<Text,TableSplits> { + static SplitResolver from(SortedSet<Text> splits) { + return row -> { + var headSet = splits.headSet(row); + Text prevRow = headSet.isEmpty() ? null : headSet.last(); + var tailSet = splits.tailSet(row); + Text endRow = tailSet.isEmpty() ? null : tailSet.first(); + return new TableSplits(prevRow, endRow); + }; + } + + /** + * For a given row, R, this function should find two split points, S1 and S2, that exist in the + * table being bulk imported to, such that S1 < R <= S2. The closer S1 and S2 are to each + * other, the better. + */ + @Override + TableSplits apply(Text row); + } + + /** + * Computes a load plan for a given rfile. This will open the rfile and find every + * {@link TableSplits} that overlaps rows in the file and add those to the returned load plan. + * + * @since 2.1.4 + */ + public static LoadPlan compute(URI file, SplitResolver splitResolver) throws IOException { + return compute(file, Map.of(), splitResolver); + } + + // KeyExtent requires a tableId and this code needs to use KeyExtent functionality but does not + // have a tableId or care what the tableId is. So this fake id is used with KeyExtent. + private static final TableId FAKE_ID = TableId.of("999"); + + /** + * Computes a load plan for a given rfile. This will open the rfile and find every + * {@link TableSplits} that overlaps rows in the file and add those to the returned load plan. + * + * @param properties used when opening the rfile, see + * {@link org.apache.accumulo.core.client.rfile.RFile.ScannerOptions#withTableProperties(Map)} + * @since 2.1.4 + */ + public static LoadPlan compute(URI file, Map<String,String> properties, + SplitResolver splitResolver) throws IOException { + try (var scanner = RFile.newScanner().from(file.toString()).withoutSystemIterators() + .withTableProperties(properties).withIndexCache(10_000_000).build()) { + BulkImport.NextRowFunction nextRowFunction = row -> { + scanner.setRange(new Range(row, null)); + var iter = scanner.iterator(); + if (iter.hasNext()) { + return iter.next().getKey().getRow(); + } else { + return null; + } + }; + + Function<Text,KeyExtent> rowToExtentResolver = row -> { + var tabletRange = splitResolver.apply(row); + var extent = new KeyExtent(FAKE_ID, tabletRange.endRow, tabletRange.prevRow); + Preconditions.checkState(extent.contains(row), "%s does not contain %s", tabletRange, row); + return extent; + }; + + List<KeyExtent> overlapping = + BulkImport.findOverlappingTablets(rowToExtentResolver, nextRowFunction); + + Path path = new Path(file); + + var builder = builder(); + for (var extent : overlapping) { + builder.loadFileTo(path.getName(), RangeType.TABLE, extent.prevEndRow(), extent.endRow()); + } + return builder.build(); + } + } } diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java index db82b0d149..f83d202b2f 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java @@ -53,6 +53,11 @@ public abstract class FileOperations { Set.of(Constants.BULK_LOAD_MAPPING, Constants.BULK_RENAME_FILE, FileOutputCommitter.SUCCEEDED_FILE_NAME, HADOOP_JOBHISTORY_LOCATION); + public static boolean isBulkWorkingFile(String fileName) { + return fileName.startsWith(Constants.BULK_WORKING_PREFIX) + || bulkWorkingFiles.contains(fileName); + } + public static Set<String> getValidExtensions() { return validExtensions; } diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java index 716803b356..58e56abf0a 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java @@ -28,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.fail; import java.io.File; import java.io.IOException; import java.net.ConnectException; +import java.net.URI; import java.security.SecureRandom; import java.util.AbstractMap; import java.util.ArrayList; @@ -40,7 +41,11 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.SortedMap; +import java.util.SortedSet; import java.util.TreeMap; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; @@ -57,6 +62,8 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; +import org.apache.accumulo.core.data.LoadPlanTest; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; @@ -881,4 +888,207 @@ public class RFileClientTest { assertTrue(Arrays.stream(exception3.getStackTrace()) .anyMatch(ste -> ste.getClassName().contains(localFsClass))); } + + @Test + public void testLoadPlanEmpty() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + LoadPlan.SplitResolver splitResolver = + LoadPlan.SplitResolver.from(new TreeSet<>(List.of(new Text("m")))); + + for (boolean withSplits : List.of(true, false)) { + String testFile = createTmpTestFile(); + var builder = RFile.newWriter().to(testFile).withFileSystem(localFs); + if (withSplits) { + builder = builder.withSplitResolver(splitResolver); + } + var writer = builder.build(); + + // can not get load plan before closing file + assertThrows(IllegalStateException.class, + () -> writer.getLoadPlan(new Path(testFile).getName())); + + try (writer) { + writer.startDefaultLocalityGroup(); + assertThrows(IllegalStateException.class, + () -> writer.getLoadPlan(new Path(testFile).getName())); + } + var loadPlan = writer.getLoadPlan(new Path(testFile).getName()); + assertEquals(0, loadPlan.getDestinations().size()); + + loadPlan = LoadPlan.compute(new URI(testFile), splitResolver); + assertEquals(0, loadPlan.getDestinations().size()); + } + } + + @Test + public void testLoadPlanLocalityGroupsNoSplits() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); + try (writer) { + writer.startNewLocalityGroup("LG1", "F1"); + writer.append(new Key("001", "F1"), "V1"); + writer.append(new Key("005", "F1"), "V2"); + writer.startNewLocalityGroup("LG2", "F3"); + writer.append(new Key("003", "F3"), "V3"); + writer.append(new Key("004", "F3"), "V4"); + writer.startDefaultLocalityGroup(); + writer.append(new Key("007", "F4"), "V5"); + writer.append(new Key("009", "F4"), "V6"); + } + + var filename = new Path(testFile).getName(); + var loadPlan = writer.getLoadPlan(filename); + assertEquals(1, loadPlan.getDestinations().size()); + + // The minimum and maximum rows happend in different locality groups, the load plan should + // reflect this + var expectedLoadPlan = + LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, "001", "009").build(); + assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson()); + } + + @Test + public void testLoadPlanLocalityGroupsSplits() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + SortedSet<Text> splits = + Stream.of("001", "002", "003", "004", "005", "006", "007", "008", "009").map(Text::new) + .collect(Collectors.toCollection(TreeSet::new)); + var splitResolver = LoadPlan.SplitResolver.from(splits); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs) + .withSplitResolver(splitResolver).build(); + try (writer) { + writer.startNewLocalityGroup("LG1", "F1"); + writer.append(new Key("001", "F1"), "V1"); + writer.append(new Key("005", "F1"), "V2"); + writer.startNewLocalityGroup("LG2", "F3"); + writer.append(new Key("003", "F3"), "V3"); + writer.append(new Key("005", "F3"), "V3"); + writer.append(new Key("007", "F3"), "V4"); + writer.startDefaultLocalityGroup(); + writer.append(new Key("007", "F4"), "V5"); + writer.append(new Key("009", "F4"), "V6"); + } + + var filename = new Path(testFile).getName(); + var loadPlan = writer.getLoadPlan(filename); + assertEquals(5, loadPlan.getDestinations().size()); + + var builder = LoadPlan.builder(); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, null, "001"); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "004", "005"); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "002", "003"); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "006", "007"); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "008", "009"); + assertEquals(LoadPlanTest.toString(builder.build().getDestinations()), + LoadPlanTest.toString(loadPlan.getDestinations())); + + loadPlan = LoadPlan.compute(new URI(testFile), splitResolver); + assertEquals(LoadPlanTest.toString(builder.build().getDestinations()), + LoadPlanTest.toString(loadPlan.getDestinations())); + } + + @Test + public void testIncorrectSplitResolver() throws Exception { + // for some rows the returns table splits will not contain the row. This should cause an error. + LoadPlan.SplitResolver splitResolver = + row -> new LoadPlan.TableSplits(new Text("003"), new Text("005")); + + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs) + .withSplitResolver(splitResolver).build(); + try (writer) { + writer.startDefaultLocalityGroup(); + writer.append(new Key("004", "F4"), "V2"); + var e = assertThrows(IllegalStateException.class, + () -> writer.append(new Key("007", "F4"), "V2")); + assertTrue(e.getMessage().contains("(003,005]")); + assertTrue(e.getMessage().contains("007")); + } + + var testFile2 = createTmpTestFile(); + var writer2 = RFile.newWriter().to(testFile2).withFileSystem(localFs).build(); + try (writer2) { + writer2.startDefaultLocalityGroup(); + writer2.append(new Key("004", "F4"), "V2"); + writer2.append(new Key("007", "F4"), "V2"); + } + + var e = assertThrows(IllegalStateException.class, + () -> LoadPlan.compute(new URI(testFile), splitResolver)); + assertTrue(e.getMessage().contains("(003,005]")); + assertTrue(e.getMessage().contains("007")); + } + + @Test + public void testGetLoadPlanBeforeClose() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); + try (writer) { + var e = assertThrows(IllegalStateException.class, + () -> writer.getLoadPlan(new Path(testFile).getName())); + assertEquals("Attempted to get load plan before closing", e.getMessage()); + writer.startDefaultLocalityGroup(); + writer.append(new Key("004", "F4"), "V2"); + var e2 = assertThrows(IllegalStateException.class, + () -> writer.getLoadPlan(new Path(testFile).getName())); + assertEquals("Attempted to get load plan before closing", e2.getMessage()); + } + } + + @Test + public void testGetLoadPlanWithPath() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); + writer.close(); + + var e = + assertThrows(IllegalArgumentException.class, () -> writer.getLoadPlan(testFile.toString())); + assertTrue(e.getMessage().contains("Unexpected path")); + assertEquals(0, writer.getLoadPlan(new Path(testFile).getName()).getDestinations().size()); + } + + @Test + public void testComputeLoadPlanWithPath() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + SortedSet<Text> splits = + Stream.of("001", "002", "003", "004", "005", "006", "007", "008", "009").map(Text::new) + .collect(Collectors.toCollection(TreeSet::new)); + var splitResolver = LoadPlan.SplitResolver.from(splits); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs) + .withSplitResolver(splitResolver).build(); + writer.startDefaultLocalityGroup(); + writer.append(new Key("001", "V4"), "test"); + writer.append(new Key("002", "V4"), "test"); + writer.append(new Key("003", "V4"), "test"); + writer.append(new Key("004", "V4"), "test"); + writer.close(); + + var e = assertThrows(IllegalArgumentException.class, () -> writer.getLoadPlan(testFile)); + assertTrue(e.getMessage().contains("Unexpected path")); + assertEquals(4, writer.getLoadPlan(new Path(testFile).getName()).getDestinations().size()); + assertEquals(4, LoadPlan.compute(new URI(testFile), splitResolver).getDestinations().size()); + + String hdfsHost = "127.0.0.5:8080"; + String fileUri = "hdfs://" + hdfsHost + "/bulk-xyx/file1.rf"; + URI uri = new URI(fileUri); + var err = assertThrows(RuntimeException.class, () -> LoadPlan.compute(uri, splitResolver)); + assertTrue(err.getMessage().contains("to " + hdfsHost + " failed on connection exception")); + assertTrue(Arrays.stream(err.getCause().getStackTrace()) + .anyMatch(ste -> ste.getClassName().contains(DistributedFileSystem.class.getName()))); + } } diff --git a/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java b/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java index bba30a555d..18f2038163 100644 --- a/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java +++ b/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java @@ -23,8 +23,11 @@ import static java.util.stream.Collectors.toSet; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import java.util.Base64; +import java.util.Collection; import java.util.HashSet; import java.util.Set; +import java.util.stream.Collectors; import org.apache.accumulo.core.data.LoadPlan.Destination; import org.apache.accumulo.core.data.LoadPlan.RangeType; @@ -100,6 +103,47 @@ public class LoadPlanTest { assertEquals(expected, actual); + var loadPlan2 = LoadPlan.fromJson(loadPlan.toJson()); + Set<String> actual2 = + loadPlan2.getDestinations().stream().map(LoadPlanTest::toString).collect(toSet()); + assertEquals(expected, actual2); + } + + @Test + public void testJson() { + var loadPlan = LoadPlan.builder().build(); + assertEquals(0, loadPlan.getDestinations().size()); + assertEquals("{\"destinations\":[]}", loadPlan.toJson()); + + var builder = LoadPlan.builder(); + builder.loadFileTo("f1.rf", RangeType.TABLE, null, "003"); + builder.loadFileTo("f2.rf", RangeType.FILE, "004", "007"); + builder.loadFileTo("f1.rf", RangeType.TABLE, "005", "006"); + builder.loadFileTo("f3.rf", RangeType.TABLE, "008", null); + String json = builder.build().toJson(); + + String b64003 = Base64.getUrlEncoder().encodeToString("003".getBytes(UTF_8)); + String b64004 = Base64.getUrlEncoder().encodeToString("004".getBytes(UTF_8)); + String b64005 = Base64.getUrlEncoder().encodeToString("005".getBytes(UTF_8)); + String b64006 = Base64.getUrlEncoder().encodeToString("006".getBytes(UTF_8)); + String b64007 = Base64.getUrlEncoder().encodeToString("007".getBytes(UTF_8)); + String b64008 = Base64.getUrlEncoder().encodeToString("008".getBytes(UTF_8)); + + String expected = "{'destinations':[{'fileName':'f1.rf','startRow':null,'endRow':'" + b64003 + + "','rangeType':'TABLE'},{'fileName':'f2.rf','startRow':'" + b64004 + "','endRow':'" + + b64007 + "','rangeType':'FILE'},{'fileName':'f1.rf','startRow':'" + b64005 + + "','endRow':'" + b64006 + "','rangeType':'TABLE'},{'fileName':'f3.rf','startRow':'" + + b64008 + "','endRow':null,'rangeType':'TABLE'}]}"; + + assertEquals(expected.replace("'", "\""), json); + } + + @Test + public void testTableSplits() { + assertThrows(IllegalArgumentException.class, + () -> new LoadPlan.TableSplits(new Text("004"), new Text("004"))); + assertThrows(IllegalArgumentException.class, + () -> new LoadPlan.TableSplits(new Text("004"), new Text("003"))); } private static String toString(Destination d) { @@ -110,4 +154,8 @@ public class LoadPlanTest { private static String toString(byte[] r) { return r == null ? null : new String(r, UTF_8); } + + public static Set<String> toString(Collection<Destination> destinations) { + return destinations.stream().map(d -> toString(d)).collect(Collectors.toSet()); + } } diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index 33545639ef..4b0c533dd1 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@ -366,7 +366,6 @@ public class RFileTest { public void test1() throws IOException { // test an empty file - TestRFile trf = new TestRFile(conf); trf.openWriter(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java index a8aebd2e25..683461b8c3 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java @@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.FileNotFoundException; import java.io.IOException; import java.math.BigInteger; +import java.net.URI; import java.nio.file.Files; import java.nio.file.Paths; import java.security.MessageDigest; @@ -88,6 +89,7 @@ import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.constraints.MetadataConstraints; import org.apache.accumulo.server.constraints.SystemEnvironment; import org.apache.accumulo.test.util.Wait; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -476,6 +478,63 @@ public class BulkNewIT extends SharedMiniClusterBase { } } + @Test + public void testComputeLoadPlan() throws Exception { + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + addSplits(c, tableName, "0333 0666 0999 1333 1666"); + + String dir = getDir("/testBulkFile-"); + + Map<String,Set<String>> hashes = new HashMap<>(); + String h1 = writeData(dir + "/f1.", aconf, 0, 333); + hashes.put("0333", new HashSet<>(List.of(h1))); + String h2 = writeData(dir + "/f2.", aconf, 0, 666); + hashes.get("0333").add(h2); + hashes.put("0666", new HashSet<>(List.of(h2))); + String h3 = writeData(dir + "/f3.", aconf, 334, 700); + hashes.get("0666").add(h3); + hashes.put("0999", new HashSet<>(List.of(h3))); + hashes.put("1333", Set.of()); + hashes.put("1666", Set.of()); + hashes.put("null", Set.of()); + + SortedSet<Text> splits = new TreeSet<>(c.tableOperations().listSplits(tableName)); + + for (String filename : List.of("f1.rf", "f2.rf", "f3.rf")) { + // The body of this loop simulates what each reducer would do + Path path = new Path(dir + "/" + filename); + + // compute the load plan for the rfile + URI file = path.toUri(); + String lpJson = LoadPlan.compute(file, LoadPlan.SplitResolver.from(splits)).toJson(); + + // save the load plan to a file + Path lpPath = new Path(path.getParent(), path.getName().replace(".rf", ".lp")); + try (var output = getCluster().getFileSystem().create(lpPath, false)) { + IOUtils.write(lpJson, output, UTF_8); + } + } + + // This simulates the code that would run after the map reduce job and bulk import the files + var builder = LoadPlan.builder(); + for (var status : getCluster().getFileSystem().listStatus(new Path(dir), + p -> p.getName().endsWith(".lp"))) { + try (var input = getCluster().getFileSystem().open(status.getPath())) { + String lpJson = IOUtils.toString(input, UTF_8); + builder.addPlan(LoadPlan.fromJson(lpJson)); + } + } + + LoadPlan lpAll = builder.build(); + + c.tableOperations().importDirectory(dir).to(tableName).plan(lpAll).load(); + + verifyData(c, tableName, 0, 700, false); + verifyMetadata(c, tableName, hashes); + } + } + @Test public void testEmptyDir() throws Exception { try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { @@ -621,7 +680,7 @@ public class BulkNewIT extends SharedMiniClusterBase { String endRow = tablet.getEndRow() == null ? "null" : tablet.getEndRow().toString(); - assertEquals(expectedHashes.get(endRow), fileHashes); + assertEquals(expectedHashes.get(endRow), fileHashes, "endRow " + endRow); endRowsSeen.add(endRow); }