Merge branch '1.7'

Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7059d734
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7059d734
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7059d734

Branch: refs/heads/master
Commit: 7059d734b36df229942ebbeac01c635a2a004ba3
Parents: f7a543a af5a37d
Author: Josh Elser <els...@apache.org>
Authored: Mon Aug 24 01:07:03 2015 -0400
Committer: Josh Elser <els...@apache.org>
Committed: Mon Aug 24 01:22:13 2015 -0400

----------------------------------------------------------------------
 .../test/BulkImportSequentialRowsIT.java        | 112 +++++++++++++++++++
 1 file changed, 112 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/7059d734/test/src/main/java/org/apache/accumulo/test/BulkImportSequentialRowsIT.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/BulkImportSequentialRowsIT.java
index 0000000,0000000..037ec7e
new file mode 100644
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/BulkImportSequentialRowsIT.java
@@@ -1,0 -1,0 +1,112 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.test;
++
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertTrue;
++
++import java.util.SortedSet;
++import java.util.TreeSet;
++
++import org.apache.accumulo.core.client.admin.TableOperations;
++import org.apache.accumulo.core.conf.DefaultConfiguration;
++import org.apache.accumulo.core.data.Key;
++import org.apache.accumulo.core.data.Value;
++import org.apache.accumulo.core.file.FileOperations;
++import org.apache.accumulo.core.file.FileSKVWriter;
++import org.apache.accumulo.core.security.Authorizations;
++import org.apache.accumulo.harness.AccumuloClusterHarness;
++import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.fs.RawLocalFileSystem;
++import org.apache.hadoop.io.Text;
++import org.junit.Test;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import com.google.common.collect.Iterables;
++
++// ACCUMULO-3967
++public class BulkImportSequentialRowsIT extends AccumuloClusterHarness {
++  private static final Logger log = 
LoggerFactory.getLogger(BulkImportSequentialRowsIT.class);
++
++  @Override
++  public int defaultTimeoutSeconds() {
++    return 60;
++  }
++
++  @Override
++  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
++    // Need more than one tserver
++    cfg.setNumTservers(2);
++
++    // use raw local file system so walogs sync and flush will work
++    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
++  }
++
++  @Test
++  public void testBulkImportFailure() throws Exception {
++    String tableName = getUniqueNames(1)[0];
++    TableOperations to = getConnector().tableOperations();
++    to.create(tableName);
++    FileSystem fs = getFileSystem();
++    Path rootPath = getUsableDir();
++    Path bulk = new Path(rootPath, "bulk");
++    log.info("bulk: {}", bulk);
++    if (fs.exists(bulk)) {
++      fs.delete(bulk, true);
++    }
++    assertTrue(fs.mkdirs(bulk));
++    Path err = new Path(rootPath, "err");
++    log.info("err: {}", err);
++    if (fs.exists(err)) {
++      fs.delete(err, true);
++    }
++    assertTrue(fs.mkdirs(err));
++
++    Path rfile = new Path(bulk, "file.rf");
++    FileSKVWriter writer = 
FileOperations.getInstance().openWriter(rfile.toString(), fs, new 
Configuration(), DefaultConfiguration.getInstance());
++    writer.startDefaultLocalityGroup();
++
++    final Value emptyValue = new Value(new byte[0]);
++    final SortedSet<Text> splits = new TreeSet<Text>();
++    for (int i = 0; i < 100; i++) {
++      String row = String.format("%03d", i);
++      splits.add(new Text(row));
++      writer.append(new Key(row, "", ""), emptyValue);
++      for (int j = 0; j < 100; j++) {
++        writer.append(new Key(row, "", String.format("%03d", j)), emptyValue);
++      }
++    }
++    writer.close();
++
++    assertTrue(fs.exists(rfile));
++
++    // Add some splits
++    to.addSplits(tableName, splits);
++
++    // Then import a single rfile to all the tablets, hoping that we get a 
failure to import because of the balancer moving tablets around
++    // and then we get to verify that the bug is actually fixed.
++    to.importDirectory(tableName, bulk.toString(), err.toString(), false);
++
++    // The bug is that some tablets don't get imported into.
++    assertEquals(10100, 
Iterables.size(getConnector().createScanner(tableName, Authorizations.EMPTY)));
++  }
++
++}

Reply via email to