Merge branch '1.7'

Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e5325306
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e5325306
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e5325306

Branch: refs/heads/master
Commit: e5325306660e733d3c2b691d52feb662a6dac86b
Parents: c4d6eee d1a9c52
Author: Dave Marion <dlmar...@apache.org>
Authored: Wed Mar 2 16:01:44 2016 -0500
Committer: Dave Marion <dlmar...@apache.org>
Committed: Wed Mar 2 16:01:44 2016 -0500

----------------------------------------------------------------------
 .../client/impl/TabletServerBatchWriter.java    | 126 +++++++++++++------
 .../accumulo/core/util/SimpleThreadPool.java    |   6 +
 .../test/functional/BatchWriterFlushIT.java     |  91 +++++++++++++-
 3 files changed, 182 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/e5325306/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
----------------------------------------------------------------------
diff --cc 
core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
index bc90d00,8922ac5..35281d6
--- 
a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
@@@ -638,12 -666,14 +665,14 @@@ public class TabletServerBatchWriter 
        queued = new HashSet<String>();
        sendThreadPool = new SimpleThreadPool(numSendThreads, 
this.getClass().getName());
        locators = new HashMap<String,TabletLocator>();
+       binningThreadPool = new SimpleThreadPool(1, "BinMutations", new 
SynchronousQueue<Runnable>());
+       binningThreadPool.setRejectedExecutionHandler(new 
ThreadPoolExecutor.CallerRunsPolicy());
      }
  
-     private TabletLocator getLocator(String tableId) {
+     private synchronized TabletLocator getLocator(String tableId) {
        TabletLocator ret = locators.get(tableId);
        if (ret == null) {
 -        ret = TabletLocator.getLocator(context, new Text(tableId));
 +        ret = TabletLocator.getLocator(context, tableId);
          ret = new TimeoutTabletLocator(ret, timeout);
          locators.put(tableId, ret);
        }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e5325306/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
index 353a6b9,0000000..5aa0c84
mode 100644,000000..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
@@@ -1,178 -1,0 +1,265 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
++import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +
++import java.util.ArrayList;
++import java.util.Collections;
++import java.util.HashSet;
 +import java.util.Iterator;
++import java.util.LinkedList;
++import java.util.List;
 +import java.util.Map.Entry;
 +import java.util.Random;
++import java.util.Set;
++import java.util.TreeSet;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
++import org.apache.accumulo.core.util.SimpleThreadPool;
 +import org.apache.hadoop.io.Text;
++import org.junit.Assert;
 +import org.junit.Test;
 +
 +import com.google.common.collect.Iterators;
- import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +
 +public class BatchWriterFlushIT extends AccumuloClusterHarness {
 +
 +  private static final int NUM_TO_FLUSH = 100000;
++  private static final int NUM_THREADS = 3;
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 90;
 +  }
 +
 +  @Test
 +  public void run() throws Exception {
 +    Connector c = getConnector();
 +    String[] tableNames = getUniqueNames(2);
 +    String bwft = tableNames[0];
 +    c.tableOperations().create(bwft);
 +    String bwlt = tableNames[1];
 +    c.tableOperations().create(bwlt);
 +    runFlushTest(bwft);
 +    runLatencyTest(bwlt);
- 
 +  }
 +
 +  private void runLatencyTest(String tableName) throws Exception {
 +    // should automatically flush after 2 seconds
 +    BatchWriter bw = getConnector().createBatchWriter(tableName, new 
BatchWriterConfig().setMaxLatency(1000, TimeUnit.MILLISECONDS));
 +    Scanner scanner = getConnector().createScanner(tableName, 
Authorizations.EMPTY);
 +
 +    Mutation m = new Mutation(new Text(String.format("r_%10d", 1)));
 +    m.put(new Text("cf"), new Text("cq"), new Value("1".getBytes(UTF_8)));
 +    bw.addMutation(m);
 +
 +    sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
 +
 +    int count = Iterators.size(scanner.iterator());
 +
 +    if (count != 0) {
 +      throw new Exception("Flushed too soon");
 +    }
 +
 +    sleepUninterruptibly(1500, TimeUnit.MILLISECONDS);
 +
 +    count = Iterators.size(scanner.iterator());
 +
 +    if (count != 1) {
 +      throw new Exception("Did not flush");
 +    }
 +
 +    bw.close();
 +  }
 +
 +  private void runFlushTest(String tableName) throws AccumuloException, 
AccumuloSecurityException, TableNotFoundException, MutationsRejectedException,
 +      Exception {
 +    BatchWriter bw = getConnector().createBatchWriter(tableName, new 
BatchWriterConfig());
 +    Scanner scanner = getConnector().createScanner(tableName, 
Authorizations.EMPTY);
 +    Random r = new Random();
 +
 +    for (int i = 0; i < 4; i++) {
 +      for (int j = 0; j < NUM_TO_FLUSH; j++) {
 +        int row = i * NUM_TO_FLUSH + j;
 +
 +        Mutation m = new Mutation(new Text(String.format("r_%10d", row)));
 +        m.put(new Text("cf"), new Text("cq"), new Value(("" + 
row).getBytes()));
 +        bw.addMutation(m);
 +      }
 +
 +      bw.flush();
 +
 +      // do a few random lookups into the data just flushed
 +
 +      for (int k = 0; k < 10; k++) {
 +        int rowToLookup = r.nextInt(NUM_TO_FLUSH) + i * NUM_TO_FLUSH;
 +
 +        scanner.setRange(new Range(new Text(String.format("r_%10d", 
rowToLookup))));
 +
 +        Iterator<Entry<Key,Value>> iter = scanner.iterator();
 +
 +        if (!iter.hasNext())
 +          throw new Exception(" row " + rowToLookup + " not found after 
flush");
 +
 +        Entry<Key,Value> entry = iter.next();
 +
 +        if (iter.hasNext())
 +          throw new Exception("Scanner returned too much");
 +
 +        verifyEntry(rowToLookup, entry);
 +      }
 +
 +      // scan all data just flushed
 +      scanner.setRange(new Range(new Text(String.format("r_%10d", i * 
NUM_TO_FLUSH)), true, new Text(String.format("r_%10d", (i + 1) * 
NUM_TO_FLUSH)), false));
 +      Iterator<Entry<Key,Value>> iter = scanner.iterator();
 +
 +      for (int j = 0; j < NUM_TO_FLUSH; j++) {
 +        int row = i * NUM_TO_FLUSH + j;
 +
 +        if (!iter.hasNext())
 +          throw new Exception("Scan stopped permaturely at " + row);
 +
 +        Entry<Key,Value> entry = iter.next();
 +
 +        verifyEntry(row, entry);
 +      }
 +
 +      if (iter.hasNext())
 +        throw new Exception("Scanner returned too much");
 +
 +    }
 +
 +    bw.close();
 +
 +    // test adding a mutation to a closed batch writer
 +    boolean caught = false;
 +    try {
 +      bw.addMutation(new Mutation(new Text("foobar")));
 +    } catch (IllegalStateException ise) {
 +      caught = true;
 +    }
 +
 +    if (!caught) {
 +      throw new Exception("Adding to closed batch writer did not fail");
 +    }
 +  }
 +
++  @Test
++  public void runMultiThreadedBinningTest() throws Exception {
++    Connector c = getConnector();
++    String[] tableNames = getUniqueNames(1);
++    String tableName = tableNames[0];
++    c.tableOperations().create(tableName);
++    for (int x = 0; x < NUM_THREADS; x++) {
++      c.tableOperations().addSplits(tableName, new 
TreeSet<Text>(Collections.singleton(new Text(Integer.toString(x * 
NUM_TO_FLUSH)))));
++    }
++    c.instanceOperations().waitForBalance();
++
++    // Logger.getLogger(TabletServerBatchWriter.class).setLevel(Level.TRACE);
++    final List<Set<Mutation>> allMuts = new LinkedList<>();
++    List<Mutation> data = new ArrayList<>();
++    for (int i = 0; i < NUM_THREADS; i++) {
++      final int thread = i;
++      for (int j = 0; j < NUM_TO_FLUSH; j++) {
++        int row = thread * NUM_TO_FLUSH + j;
++        Mutation m = new Mutation(new Text(String.format("%10d", row)));
++        m.put(new Text("cf" + thread), new Text("cq"), new Value(("" + 
row).getBytes()));
++        data.add(m);
++      }
++    }
++    Assert.assertEquals(NUM_THREADS * NUM_TO_FLUSH, data.size());
++    Collections.shuffle(data);
++    for (int n = 0; n < (NUM_THREADS * NUM_TO_FLUSH); n += NUM_TO_FLUSH) {
++      Set<Mutation> muts = new HashSet<>(data.subList(n, n + NUM_TO_FLUSH));
++      allMuts.add(muts);
++    }
++
++    SimpleThreadPool threads = new SimpleThreadPool(NUM_THREADS, 
"ClientThreads");
++    threads.allowCoreThreadTimeOut(false);
++    threads.prestartAllCoreThreads();
++
++    BatchWriterConfig cfg = new BatchWriterConfig();
++    cfg.setMaxLatency(10, TimeUnit.SECONDS);
++    cfg.setMaxMemory(1 * 1024 * 1024);
++    cfg.setMaxWriteThreads(NUM_THREADS);
++    final BatchWriter bw = getConnector().createBatchWriter(tableName, cfg);
++
++    for (int k = 0; k < NUM_THREADS; k++) {
++      final int idx = k;
++      threads.execute(new Runnable() {
++        @Override
++        public void run() {
++          try {
++            bw.addMutations(allMuts.get(idx));
++            bw.flush();
++          } catch (MutationsRejectedException e) {
++            Assert.fail("Error adding mutations to batch writer");
++          }
++        }
++      });
++    }
++    threads.shutdown();
++    threads.awaitTermination(3, TimeUnit.MINUTES);
++    bw.close();
++    Scanner scanner = getConnector().createScanner(tableName, 
Authorizations.EMPTY);
++    for (Entry<Key,Value> e : scanner) {
++      Mutation m = new Mutation(e.getKey().getRow());
++      m.put(e.getKey().getColumnFamily(), e.getKey().getColumnQualifier(), 
e.getValue());
++      boolean found = false;
++      for (int l = 0; l < NUM_THREADS; l++) {
++        if (allMuts.get(l).contains(m)) {
++          found = true;
++          allMuts.get(l).remove(m);
++          break;
++        }
++      }
++      Assert.assertTrue("Mutation not found: " + m.toString(), found);
++    }
++
++    for (int m = 0; m < NUM_THREADS; m++) {
++      Assert.assertEquals(0, allMuts.get(m).size());
++    }
++
++  }
++
 +  private void verifyEntry(int row, Entry<Key,Value> entry) throws Exception {
 +    if (!entry.getKey().getRow().toString().equals(String.format("r_%10d", 
row))) {
 +      throw new Exception("Unexpected key returned, expected " + row + " got 
" + entry.getKey());
 +    }
 +
 +    if (!entry.getValue().toString().equals("" + row)) {
 +      throw new Exception("Unexpected value, expected " + row + " got " + 
entry.getValue());
 +    }
 +  }
 +
 +}

Reply via email to