Merge remote-tracking branch 'origin/1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT

Conflicts:
        README
        bin/config.sh
        docs/combiners.html
        docs/config.html
        pom.xml
        src/core/pom.xml
        src/core/src/main/java/org/apache/accumulo/core/Constants.java
        
src/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
        src/examples/pom.xml
        src/examples/simple/pom.xml
        src/examples/wikisearch/ingest/bin/ingest.sh
        src/examples/wikisearch/ingest/bin/ingest_parallel.sh
        src/examples/wikisearch/ingest/pom.xml
        src/examples/wikisearch/pom.xml
        src/examples/wikisearch/query-war/pom.xml
        src/examples/wikisearch/query/pom.xml
        src/minicluster/pom.xml
        src/proxy/pom.xml
        src/server/pom.xml
        src/server/src/main/c++/mlock/Makefile
        src/server/src/main/c++/nativeMap/Makefile
        
src/server/src/main/java/org/apache/accumulo/server/logger/LogService.java
        
src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java
        src/server/src/main/resources/config.html
        src/start/pom.xml
        src/trace/pom.xml


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/710c6234
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/710c6234
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/710c6234

Branch: refs/heads/master
Commit: 710c6234b494f57c9f956abd525f26fb2413246b
Parents: 9e15d71 a7aeb88
Author: Eric Newton <e...@apache.org>
Authored: Tue Sep 3 08:57:07 2013 -0400
Committer: Eric Newton <e...@apache.org>
Committed: Tue Sep 3 08:57:07 2013 -0400

----------------------------------------------------------------------
 README                                                             | 1 -
 .../apache/accumulo/core/client/impl/TabletServerBatchWriter.java  | 2 +-
 2 files changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/710c6234/README
----------------------------------------------------------------------
diff --cc README
index 445af37,ebe89e4..2e05aec
--- a/README
+++ b/README
@@@ -76,25 -63,9 +76,24 @@@ the machines in the cluster and that ha
  found in the same location on every machine in the cluster.  You will need to
  have password-less ssh set up as described in the hadoop documentation. 
  
 -You will need to have hadoop installed and configured on your system.
 -Accumulo 1.4.5-SNAPSHOT has been tested with hadoop version 0.20.2.
 +You will need to have hadoop installed and configured on your system.  
Accumulo
 +1.5.0 has been tested with hadoop version 1.0.4.  To avoid data loss,
 +you must enable HDFS durable sync.  How you enable this depends on your 
version
 +of Hadoop. Please consult the table below for information regarding your 
version.
 +If you need to set the coniguration, please be sure to restart HDFS. See 
 +ACCUMULO-623 for more information.
 +
 +HADOOP RELEASE          VERSION           SYNC NAME             DEFAULT
 +Apache Hadoop           0.20.205          dfs.support.append    false
 +Apache Hadoop            0.23.x           dfs.support.append    true
 +Apache Hadoop             1.0.x           dfs.support.append    false
 +Apache Hadoop             1.1.x           dfs.durable.sync      true
 +Apache Hadoop          2.0.0-2.0.4        dfs.support.append    true
 +Cloudera CDH             3u0-3u3             ????               true
 +Cloudera CDH               3u4            dfs.support.append    true
 +Hortonworks HDP           `1.0            dfs.support.append    false
 +Hortonworks HDP           `1.1            dfs.support.append    false
  
- 
  The example accumulo configuration files are placed in directories based on 
the 
  memory footprint for the accumulo processes.  If you are using native 
libraries
  for you tablet server in-memory map, then you can use the files in 
"native-standalone".

http://git-wip-us.apache.org/repos/asf/accumulo/blob/710c6234/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
----------------------------------------------------------------------
diff --cc 
core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
index 14bff8c,0000000..ca182a6
mode 100644,000000..100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
@@@ -1,1009 -1,0 +1,1009 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.impl;
 +
 +import java.io.IOException;
 +import java.lang.management.CompilationMXBean;
 +import java.lang.management.GarbageCollectorMXBean;
 +import java.lang.management.ManagementFactory;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.Timer;
 +import java.util.TimerTask;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.TableDeletedException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.TableOfflineException;
 +import org.apache.accumulo.core.client.TimedOutException;
 +import 
org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations;
 +import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.constraints.Violations;
 +import org.apache.accumulo.core.data.ConstraintViolationSummary;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.thrift.TMutation;
 +import org.apache.accumulo.core.data.thrift.UpdateErrors;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import 
org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
 +import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
 +import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.accumulo.core.util.SimpleThreadPool;
 +import org.apache.accumulo.core.util.ThriftUtil;
 +import org.apache.accumulo.trace.instrument.Span;
 +import org.apache.accumulo.trace.instrument.Trace;
 +import org.apache.accumulo.trace.instrument.Tracer;
 +import org.apache.accumulo.trace.thrift.TInfo;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TApplicationException;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.TServiceClient;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +
 +/*
 + * Differences from previous TabletServerBatchWriter
 + *   + As background threads finish sending mutations to tablet servers they 
decrement memory usage
 + *   + Once the queue of unprocessed mutations reaches 50% it is always 
pushed to the background threads, 
 + *      even if they are currently processing... new mutations are merged 
with mutations currently 
 + *      processing in the background
 + *   + Failed mutations are held for 1000ms and then re-added to the 
unprocessed queue
 + *   + Flush holds adding of new mutations so it does not wait indefinitely
 + * 
 + * Considerations
 + *   + All background threads must catch and note Throwable
 + *   + mutations for a single tablet server are only processed by one thread 
concurrently (if new mutations 
 + *      come in for a tablet server while one thread is processing mutations 
for it, no other thread should 
 + *      start processing those mutations)
 + *   
 + * Memory accounting
 + *   + when a mutation enters the system memory is incremented
 + *   + when a mutation successfully leaves the system memory is decremented
 + * 
 + * 
 + * 
 + */
 +
 +public class TabletServerBatchWriter {
 +  
 +  private static final Logger log = 
Logger.getLogger(TabletServerBatchWriter.class);
 +  
 +  private long totalMemUsed = 0;
 +  private long maxMem;
 +  private MutationSet mutations;
 +  private boolean flushing;
 +  private boolean closed;
 +  private MutationWriter writer;
 +  private FailedMutations failedMutations;
 +  
 +  private Instance instance;
 +  private TCredentials credentials;
 +  
 +  private Violations violations;
 +  private Map<KeyExtent,Set<SecurityErrorCode>> authorizationFailures;
 +  private HashSet<String> serverSideErrors;
 +  private int unknownErrors = 0;
 +  private boolean somethingFailed = false;
 +  
 +  private Timer jtimer;
 +  
 +  private long maxLatency;
 +  
 +  private long timeout;
 +  
 +  private long lastProcessingStartTime;
 +  
 +  private long totalAdded = 0;
 +  private AtomicLong totalSent = new AtomicLong(0);
 +  private AtomicLong totalBinned = new AtomicLong(0);
 +  private AtomicLong totalBinTime = new AtomicLong(0);
 +  private AtomicLong totalSendTime = new AtomicLong(0);
 +  private long startTime = 0;
 +  private long initialGCTimes;
 +  private long initialCompileTimes;
 +  private double initialSystemLoad;
 +  
 +  private int tabletServersBatchSum = 0;
 +  private int tabletBatchSum = 0;
 +  private int numBatches = 0;
 +  private int maxTabletBatch = Integer.MIN_VALUE;
 +  private int minTabletBatch = Integer.MAX_VALUE;
 +  private int minTabletServersBatch = Integer.MAX_VALUE;
 +  private int maxTabletServersBatch = Integer.MIN_VALUE;
 +  
 +  private Throwable lastUnknownError = null;
 +  
 +  private Map<String,TimeoutTracker> timeoutTrackers;
 +  
 +  private static class TimeoutTracker {
 +    
 +    String server;
 +    long timeOut;
 +    long activityTime;
 +    Long firstErrorTime = null;
 +    
 +    TimeoutTracker(String server, long timeOut) {
 +      this.timeOut = timeOut;
 +      this.server = server;
 +    }
 +    
 +    void startingWrite() {
 +      activityTime = System.currentTimeMillis();
 +    }
 +    
 +    void madeProgress() {
 +      activityTime = System.currentTimeMillis();
 +      firstErrorTime = null;
 +    }
 +    
 +    void wroteNothing() {
 +      if (firstErrorTime == null) {
 +        firstErrorTime = activityTime;
 +      } else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
 +        throw new TimedOutException(Collections.singleton(server));
 +      }
 +    }
 +    
 +    void errorOccured(Exception e) {
 +      wroteNothing();
 +    }
 +    
 +    public long getTimeOut() {
 +      return timeOut;
 +    }
 +  }
 +  
 +  public TabletServerBatchWriter(Instance instance, TCredentials credentials, 
BatchWriterConfig config) {
 +    this.instance = instance;
 +    this.maxMem = config.getMaxMemory();
 +    this.maxLatency = config.getMaxLatency(TimeUnit.MILLISECONDS) <= 0 ? 
Long.MAX_VALUE : config.getMaxLatency(TimeUnit.MILLISECONDS);
 +    this.credentials = credentials;
 +    this.timeout = config.getTimeout(TimeUnit.MILLISECONDS);
 +    mutations = new MutationSet();
 +    
 +    violations = new Violations();
 +    
 +    authorizationFailures = new HashMap<KeyExtent,Set<SecurityErrorCode>>();
 +    serverSideErrors = new HashSet<String>();
 +    
 +    lastProcessingStartTime = System.currentTimeMillis();
 +    
 +    jtimer = new Timer("BatchWriterLatencyTimer", true);
 +    
 +    writer = new MutationWriter(config.getMaxWriteThreads());
 +    failedMutations = new FailedMutations();
 +    
 +    timeoutTrackers = Collections.synchronizedMap(new 
HashMap<String,TabletServerBatchWriter.TimeoutTracker>());
 +    
 +    if (this.maxLatency != Long.MAX_VALUE) {
 +      jtimer.schedule(new TimerTask() {
 +        public void run() {
 +          try {
 +            synchronized (TabletServerBatchWriter.this) {
 +              if ((System.currentTimeMillis() - lastProcessingStartTime) > 
TabletServerBatchWriter.this.maxLatency)
 +                startProcessing();
 +            }
 +          } catch (Throwable t) {
 +            updateUnknownErrors("Max latency task failed " + t.getMessage(), 
t);
 +          }
 +        }
 +      }, 0, this.maxLatency / 4);
 +    }
 +  }
 +  
 +  private synchronized void startProcessing() {
 +    if (mutations.getMemoryUsed() == 0)
 +      return;
 +    lastProcessingStartTime = System.currentTimeMillis();
 +    writer.addMutations(mutations);
 +    mutations = new MutationSet();
 +  }
 +  
 +  private synchronized void decrementMemUsed(long amount) {
 +    totalMemUsed -= amount;
 +    this.notifyAll();
 +  }
 +  
 +  public synchronized void addMutation(String table, Mutation m) throws 
MutationsRejectedException {
 +    
 +    if (closed)
 +      throw new IllegalStateException("Closed");
 +    if (m.size() == 0)
 +      throw new IllegalArgumentException("Can not add empty mutations");
 +    
 +    checkForFailures();
 +    
-     while ((totalMemUsed >= maxMem || flushing) && !somethingFailed) {
++    while ((totalMemUsed > maxMem || flushing) && !somethingFailed) {
 +      waitRTE();
 +    }
 +    
 +    // do checks again since things could have changed while waiting and not 
holding lock
 +    if (closed)
 +      throw new IllegalStateException("Closed");
 +    checkForFailures();
 +    
 +    if (startTime == 0) {
 +      startTime = System.currentTimeMillis();
 +      
 +      List<GarbageCollectorMXBean> gcmBeans = 
ManagementFactory.getGarbageCollectorMXBeans();
 +      for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) {
 +        initialGCTimes += garbageCollectorMXBean.getCollectionTime();
 +      }
 +      
 +      CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean();
 +      if (compMxBean.isCompilationTimeMonitoringSupported()) {
 +        initialCompileTimes = compMxBean.getTotalCompilationTime();
 +      }
 +      
 +      initialSystemLoad = 
ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
 +    }
 +    
 +    // create a copy of mutation so that after this method returns the user
 +    // is free to reuse the mutation object, like calling readFields... this
 +    // is important for the case where a mutation is passed from map to reduce
 +    // to batch writer... the map reduce code will keep passing the same 
mutation
 +    // object into the reduce method
 +    m = new Mutation(m);
 +    
 +    totalMemUsed += m.estimatedMemoryUsed();
 +    mutations.addMutation(table, m);
 +    totalAdded++;
 +    
 +    if (mutations.getMemoryUsed() >= maxMem / 2) {
 +      startProcessing();
 +      checkForFailures();
 +    }
 +  }
 +  
 +  public void addMutation(String table, Iterator<Mutation> iterator) throws 
MutationsRejectedException {
 +    while (iterator.hasNext()) {
 +      addMutation(table, iterator.next());
 +    }
 +  }
 +  
 +  public synchronized void flush() throws MutationsRejectedException {
 +    
 +    if (closed)
 +      throw new IllegalStateException("Closed");
 +    
 +    Span span = Trace.start("flush");
 +    
 +    try {
 +      checkForFailures();
 +      
 +      if (flushing) {
 +        // some other thread is currently flushing, so wait
 +        while (flushing && !somethingFailed)
 +          waitRTE();
 +        
 +        checkForFailures();
 +        
 +        return;
 +      }
 +      
 +      flushing = true;
 +      
 +      startProcessing();
 +      checkForFailures();
 +      
 +      while (totalMemUsed > 0 && !somethingFailed) {
 +        waitRTE();
 +      }
 +      
 +      flushing = false;
 +      this.notifyAll();
 +      
 +      checkForFailures();
 +    } finally {
 +      span.stop();
 +    }
 +  }
 +  
 +  public synchronized void close() throws MutationsRejectedException {
 +    
 +    if (closed)
 +      return;
 +    
 +    Span span = Trace.start("close");
 +    try {
 +      closed = true;
 +      
 +      startProcessing();
 +      
 +      while (totalMemUsed > 0 && !somethingFailed) {
 +        waitRTE();
 +      }
 +      
 +      logStats();
 +      
 +      checkForFailures();
 +    } finally {
 +      // make a best effort to release these resources
 +      writer.sendThreadPool.shutdownNow();
 +      jtimer.cancel();
 +      span.stop();
 +    }
 +  }
 +  
 +  private void logStats() {
 +    long finishTime = System.currentTimeMillis();
 +    
 +    long finalGCTimes = 0;
 +    List<GarbageCollectorMXBean> gcmBeans = 
ManagementFactory.getGarbageCollectorMXBeans();
 +    for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) {
 +      finalGCTimes += garbageCollectorMXBean.getCollectionTime();
 +    }
 +    
 +    CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean();
 +    long finalCompileTimes = 0;
 +    if (compMxBean.isCompilationTimeMonitoringSupported()) {
 +      finalCompileTimes = compMxBean.getTotalCompilationTime();
 +    }
 +    
 +    double averageRate = totalSent.get() / (totalSendTime.get() / 1000.0);
 +    double overallRate = totalAdded / ((finishTime - startTime) / 1000.0);
 +    
 +    double finalSystemLoad = 
ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
 +    
 +    if (log.isTraceEnabled()) {
 +      log.trace("");
 +      log.trace("TABLET SERVER BATCH WRITER STATISTICS");
 +      log.trace(String.format("Added                : %,10d mutations", 
totalAdded));
 +      log.trace(String.format("Sent                 : %,10d mutations", 
totalSent.get()));
 +      log.trace(String.format("Resent percentage   : %10.2f%s", 
(totalSent.get() - totalAdded) / (double) totalAdded * 100.0, "%"));
 +      log.trace(String.format("Overall time         : %,10.2f secs", 
(finishTime - startTime) / 1000.0));
 +      log.trace(String.format("Overall send rate    : %,10.2f mutations/sec", 
overallRate));
 +      log.trace(String.format("Send efficiency      : %10.2f%s", overallRate 
/ averageRate * 100.0, "%"));
 +      log.trace("");
 +      log.trace("BACKGROUND WRITER PROCESS STATISTICS");
 +      log.trace(String.format("Total send time      : %,10.2f secs %6.2f%s", 
totalSendTime.get() / 1000.0, 100.0 * totalSendTime.get()
 +          / (finishTime - startTime), "%"));
 +      log.trace(String.format("Average send rate    : %,10.2f mutations/sec", 
averageRate));
 +      log.trace(String.format("Total bin time       : %,10.2f secs %6.2f%s", 
totalBinTime.get() / 1000.0,
 +          100.0 * totalBinTime.get() / (finishTime - startTime), "%"));
 +      log.trace(String.format("Average bin rate     : %,10.2f mutations/sec", 
totalBinned.get() / (totalBinTime.get() / 1000.0)));
 +      log.trace(String.format("tservers per batch   : %,8.2f avg  %,6d min 
%,6d max", (tabletServersBatchSum / (double) numBatches), minTabletServersBatch,
 +          maxTabletServersBatch));
 +      log.trace(String.format("tablets per batch    : %,8.2f avg  %,6d min 
%,6d max", (tabletBatchSum / (double) numBatches), minTabletBatch, 
maxTabletBatch));
 +      log.trace("");
 +      log.trace("SYSTEM STATISTICS");
 +      log.trace(String.format("JVM GC Time          : %,10.2f secs", 
((finalGCTimes - initialGCTimes) / 1000.0)));
 +      if (compMxBean.isCompilationTimeMonitoringSupported()) {
 +        log.trace(String.format("JVM Compile Time     : %,10.2f secs", 
((finalCompileTimes - initialCompileTimes) / 1000.0)));
 +      }
 +      log.trace(String.format("System load average : initial=%6.2f 
final=%6.2f", initialSystemLoad, finalSystemLoad));
 +    }
 +  }
 +  
 +  private void updateSendStats(long count, long time) {
 +    totalSent.addAndGet(count);
 +    totalSendTime.addAndGet(time);
 +  }
 +  
 +  public void updateBinningStats(int count, long time, 
Map<String,TabletServerMutations> binnedMutations) {
 +    totalBinTime.addAndGet(time);
 +    totalBinned.addAndGet(count);
 +    updateBatchStats(binnedMutations);
 +  }
 +  
 +  private synchronized void 
updateBatchStats(Map<String,TabletServerMutations> binnedMutations) {
 +    tabletServersBatchSum += binnedMutations.size();
 +    
 +    minTabletServersBatch = Math.min(minTabletServersBatch, 
binnedMutations.size());
 +    maxTabletServersBatch = Math.max(maxTabletServersBatch, 
binnedMutations.size());
 +    
 +    int numTablets = 0;
 +    
 +    for (Entry<String,TabletServerMutations> entry : 
binnedMutations.entrySet()) {
 +      TabletServerMutations tsm = entry.getValue();
 +      numTablets += tsm.getMutations().size();
 +    }
 +    
 +    tabletBatchSum += numTablets;
 +    
 +    minTabletBatch = Math.min(minTabletBatch, numTablets);
 +    maxTabletBatch = Math.max(maxTabletBatch, numTablets);
 +    
 +    numBatches++;
 +  }
 +  
 +  private void waitRTE() {
 +    try {
 +      wait();
 +    } catch (InterruptedException e) {
 +      throw new RuntimeException(e);
 +    }
 +  }
 +  
 +  // BEGIN code for handling unrecoverable errors
 +  
 +  private void updatedConstraintViolations(List<ConstraintViolationSummary> 
cvsList) {
 +    if (cvsList.size() > 0) {
 +      synchronized (this) {
 +        somethingFailed = true;
 +        violations.add(cvsList);
 +        this.notifyAll();
 +      }
 +    }
 +  }
 +
 +  private void updateAuthorizationFailures(Set<KeyExtent> keySet, 
SecurityErrorCode code) {
 +    HashMap<KeyExtent, SecurityErrorCode> map = new HashMap<KeyExtent, 
SecurityErrorCode>();
 +    for (KeyExtent ke : keySet)
 +      map.put(ke, code);
 +    
 +    updateAuthorizationFailures(map);
 +  }
 +  
 +  private void updateAuthorizationFailures(Map<KeyExtent,SecurityErrorCode> 
authorizationFailures) {
 +    if (authorizationFailures.size() > 0) {
 +      
 +      // was a table deleted?
 +      HashSet<String> tableIds = new HashSet<String>();
 +      for (KeyExtent ke : authorizationFailures.keySet())
 +        tableIds.add(ke.getTableId().toString());
 +      
 +      Tables.clearCache(instance);
 +      for (String tableId : tableIds)
 +        if (!Tables.exists(instance, tableId))
 +          throw new TableDeletedException(tableId);
 +      
 +      synchronized (this) {
 +        somethingFailed = true;
 +        mergeAuthorizationFailures(this.authorizationFailures, 
authorizationFailures);
 +        this.notifyAll();
 +      }
 +    }
 +  }
 +  
 +  private void 
mergeAuthorizationFailures(Map<KeyExtent,Set<SecurityErrorCode>> source, 
Map<KeyExtent,SecurityErrorCode> addition) {
 +    for (Entry<KeyExtent,SecurityErrorCode> entry : addition.entrySet()) {
 +      Set<SecurityErrorCode> secs = source.get(entry.getKey());
 +      if (secs == null) {
 +        secs = new HashSet<SecurityErrorCode>();
 +        source.put(entry.getKey(), secs);
 +      }
 +      secs.add(entry.getValue());
 +    }
 +  }
 +  
 +  private synchronized void updateServerErrors(String server, Exception e) {
 +    somethingFailed = true;
 +    this.serverSideErrors.add(server);
 +    this.notifyAll();
 +    log.error("Server side error on " + server);
 +  }
 +  
 +  private synchronized void updateUnknownErrors(String msg, Throwable t) {
 +    somethingFailed = true;
 +    unknownErrors++;
 +    this.lastUnknownError = t;
 +    this.notifyAll();
 +    if (t instanceof TableDeletedException || t instanceof 
TableOfflineException || t instanceof TimedOutException)
 +      log.debug(msg, t); // this is not unknown
 +    else
 +      log.error(msg, t);
 +  }
 +  
 +  private void checkForFailures() throws MutationsRejectedException {
 +    if (somethingFailed) {
 +      List<ConstraintViolationSummary> cvsList = violations.asList();
 +      
HashMap<KeyExtent,Set<org.apache.accumulo.core.client.security.SecurityErrorCode>>
 af = new 
HashMap<KeyExtent,Set<org.apache.accumulo.core.client.security.SecurityErrorCode>>();
 +      for (Entry<KeyExtent,Set<SecurityErrorCode>> entry : 
authorizationFailures.entrySet()) {
 +        HashSet<org.apache.accumulo.core.client.security.SecurityErrorCode> 
codes = new 
HashSet<org.apache.accumulo.core.client.security.SecurityErrorCode>();
 +        
 +        for (SecurityErrorCode sce : entry.getValue()) {
 +          
codes.add(org.apache.accumulo.core.client.security.SecurityErrorCode.valueOf(sce.name()));
 +        }
 +        
 +        af.put(entry.getKey(), codes);
 +      }
 +      
 +      throw new MutationsRejectedException(cvsList, af, serverSideErrors, 
unknownErrors, lastUnknownError);
 +    }
 +  }
 +  
 +  // END code for handling unrecoverable errors
 +  
 +  // BEGIN code for handling failed mutations
 +  
 +  /**
 +   * Add mutations that previously failed back into the mix
 +   * 
 +   * @param mutationsprivate
 +   *          static final Logger log = 
Logger.getLogger(TabletServerBatchWriter.class);
 +   */
 +  private synchronized void addFailedMutations(MutationSet failedMutations) 
throws Exception {
 +    mutations.addAll(failedMutations);
 +    if (mutations.getMemoryUsed() >= maxMem / 2 || closed || flushing) {
 +      startProcessing();
 +    }
 +  }
 +  
 +  private class FailedMutations extends TimerTask {
 +    
 +    private MutationSet recentFailures = null;
 +    private long initTime;
 +    
 +    FailedMutations() {
 +      jtimer.schedule(this, 0, 500);
 +    }
 +    
 +    private MutationSet init() {
 +      if (recentFailures == null) {
 +        recentFailures = new MutationSet();
 +        initTime = System.currentTimeMillis();
 +      }
 +      return recentFailures;
 +    }
 +    
 +    synchronized void add(String table, ArrayList<Mutation> tableFailures) {
 +      init().addAll(table, tableFailures);
 +    }
 +    
 +    synchronized void add(MutationSet failures) {
 +      init().addAll(failures);
 +    }
 +    
 +    synchronized void add(String location, TabletServerMutations tsm) {
 +      init();
 +      for (Entry<KeyExtent,List<Mutation>> entry : 
tsm.getMutations().entrySet()) {
 +        recentFailures.addAll(entry.getKey().getTableId().toString(), 
entry.getValue());
 +      }
 +      
 +    }
 +    
 +    @Override
 +    public void run() {
 +      try {
 +        MutationSet rf = null;
 +        
 +        synchronized (this) {
 +          if (recentFailures != null && System.currentTimeMillis() - initTime 
> 1000) {
 +            rf = recentFailures;
 +            recentFailures = null;
 +          }
 +        }
 +        
 +        if (rf != null) {
 +          if (log.isTraceEnabled())
 +            log.trace("requeuing " + rf.size() + " failed mutations");
 +          addFailedMutations(rf);
 +        }
 +      } catch (Throwable t) {
 +        updateUnknownErrors("Failed to requeue failed mutations " + 
t.getMessage(), t);
 +        cancel();
 +      }
 +    }
 +  }
 +  
 +  // END code for handling failed mutations
 +  
 +  // BEGIN code for sending mutations to tablet servers using background 
threads
 +  
 +  private class MutationWriter {
 +    
 +    private static final int MUTATION_BATCH_SIZE = 1 << 17;
 +    private ExecutorService sendThreadPool;
 +    private Map<String,TabletServerMutations> serversMutations;
 +    private Set<String> queued;
 +    private Map<String,TabletLocator> locators;
 +    
 +    public MutationWriter(int numSendThreads) {
 +      serversMutations = new HashMap<String,TabletServerMutations>();
 +      queued = new HashSet<String>();
 +      sendThreadPool = new SimpleThreadPool(numSendThreads, 
this.getClass().getName());
 +      locators = new HashMap<String,TabletLocator>();
 +    }
 +    
 +    private TabletLocator getLocator(String tableId) {
 +      TabletLocator ret = locators.get(tableId);
 +      if (ret == null) {
 +        ret = TabletLocator.getInstance(instance, new Text(tableId));
 +        ret = new TimeoutTabletLocator(ret, timeout);
 +        locators.put(tableId, ret);
 +      }
 +      
 +      return ret;
 +    }
 +    
 +    private void binMutations(MutationSet mutationsToProcess, 
Map<String,TabletServerMutations> binnedMutations) {
 +      try {
 +        Set<Entry<String,List<Mutation>>> es = 
mutationsToProcess.getMutations().entrySet();
 +        for (Entry<String,List<Mutation>> entry : es) {
 +          TabletLocator locator = getLocator(entry.getKey());
 +          
 +          String table = entry.getKey();
 +          List<Mutation> tableMutations = entry.getValue();
 +          
 +          if (tableMutations != null) {
 +            ArrayList<Mutation> tableFailures = new ArrayList<Mutation>();
 +            locator.binMutations(tableMutations, binnedMutations, 
tableFailures, credentials);
 +            
 +            if (tableFailures.size() > 0) {
 +              failedMutations.add(table, tableFailures);
 +              
 +              if (tableFailures.size() == tableMutations.size())
 +                if (!Tables.exists(instance, entry.getKey()))
 +                  throw new TableDeletedException(entry.getKey());
 +                else if (Tables.getTableState(instance, table) == 
TableState.OFFLINE)
 +                  throw new TableOfflineException(instance, entry.getKey());
 +            }
 +          }
 +          
 +        }
 +        return;
 +      } catch (AccumuloServerException ase) {
 +        updateServerErrors(ase.getServer(), ase);
 +      } catch (AccumuloException ae) {
 +        // assume an IOError communicating with !METADATA tablet
 +        failedMutations.add(mutationsToProcess);
 +      } catch (AccumuloSecurityException e) {
 +        updateAuthorizationFailures(Collections.singletonMap(new 
KeyExtent(new Text(Constants.METADATA_TABLE_ID), null, null),
 +            SecurityErrorCode.valueOf(e.getSecurityErrorCode().name())));
 +      } catch (TableDeletedException e) {
 +        updateUnknownErrors(e.getMessage(), e);
 +      } catch (TableOfflineException e) {
 +        updateUnknownErrors(e.getMessage(), e);
 +      } catch (TableNotFoundException e) {
 +        updateUnknownErrors(e.getMessage(), e);
 +      }
 +      
 +      // an error ocurred
 +      binnedMutations.clear();
 +      
 +    }
 +    
 +    void addMutations(MutationSet mutationsToSend) {
 +      Map<String,TabletServerMutations> binnedMutations = new 
HashMap<String,TabletServerMutations>();
 +      Span span = Trace.start("binMutations");
 +      try {
 +        long t1 = System.currentTimeMillis();
 +        binMutations(mutationsToSend, binnedMutations);
 +        long t2 = System.currentTimeMillis();
 +        updateBinningStats(mutationsToSend.size(), (t2 - t1), 
binnedMutations);
 +      } finally {
 +        span.stop();
 +      }
 +      addMutations(binnedMutations);
 +    }
 +    
 +    private synchronized void addMutations(Map<String,TabletServerMutations> 
binnedMutations) {
 +      
 +      int count = 0;
 +      
 +      // merge mutations into existing mutations for a tablet server
 +      for (Entry<String,TabletServerMutations> entry : 
binnedMutations.entrySet()) {
 +        String server = entry.getKey();
 +        
 +        TabletServerMutations currentMutations = serversMutations.get(server);
 +        
 +        if (currentMutations == null) {
 +          serversMutations.put(server, entry.getValue());
 +        } else {
 +          for (Entry<KeyExtent,List<Mutation>> entry2 : 
entry.getValue().getMutations().entrySet()) {
 +            for (Mutation m : entry2.getValue()) {
 +              currentMutations.addMutation(entry2.getKey(), m);
 +            }
 +          }
 +        }
 +        
 +        if (log.isTraceEnabled())
 +          for (Entry<KeyExtent,List<Mutation>> entry2 : 
entry.getValue().getMutations().entrySet())
 +            count += entry2.getValue().size();
 +        
 +      }
 +      
 +      if (count > 0 && log.isTraceEnabled())
 +        log.trace(String.format("Started sending %,d mutations to %,d tablet 
servers", count, binnedMutations.keySet().size()));
 +      
 +      // randomize order of servers
 +      ArrayList<String> servers = new 
ArrayList<String>(binnedMutations.keySet());
 +      Collections.shuffle(servers);
 +      
 +      for (String server : servers)
 +        if (!queued.contains(server)) {
 +          sendThreadPool.submit(Trace.wrap(new SendTask(server)));
 +          queued.add(server);
 +        }
 +    }
 +    
 +    private synchronized TabletServerMutations getMutationsToSend(String 
server) {
 +      TabletServerMutations tsmuts = serversMutations.remove(server);
 +      if (tsmuts == null)
 +        queued.remove(server);
 +      
 +      return tsmuts;
 +    }
 +    
 +    class SendTask implements Runnable {
 +      
 +      private String location;
 +      
 +      SendTask(String server) {
 +        this.location = server;
 +      }
 +      
 +      @Override
 +      public void run() {
 +        try {
 +          TabletServerMutations tsmuts = getMutationsToSend(location);
 +          
 +          while (tsmuts != null) {
 +            send(tsmuts);
 +            tsmuts = getMutationsToSend(location);
 +          }
 +          
 +          return;
 +        } catch (Throwable t) {
 +          updateUnknownErrors("Failed to send tablet server " + location + " 
its batch : " + t.getMessage(), t);
 +        }
 +      }
 +      
 +      public void send(TabletServerMutations tsm) throws 
AccumuloServerException, AccumuloSecurityException {
 +        
 +        MutationSet failures = null;
 +        
 +        String oldName = Thread.currentThread().getName();
 +        
 +        Map<KeyExtent,List<Mutation>> mutationBatch = tsm.getMutations();
 +        try {
 +          
 +          long count = 0;
 +          for (List<Mutation> list : mutationBatch.values()) {
 +            count += list.size();
 +          }
 +          String msg = "sending " + String.format("%,d", count) + " mutations 
to " + String.format("%,d", mutationBatch.size()) + " tablets at " + location;
 +          Thread.currentThread().setName(msg);
 +          
 +          Span span = Trace.start("sendMutations");
 +          try {
 +            
 +            TimeoutTracker timeoutTracker = timeoutTrackers.get(location);
 +            if (timeoutTracker == null) {
 +              timeoutTracker = new TimeoutTracker(location, timeout);
 +              timeoutTrackers.put(location, timeoutTracker);
 +            }
 +            
 +            long st1 = System.currentTimeMillis();
 +            failures = sendMutationsToTabletServer(location, mutationBatch, 
timeoutTracker);
 +            long st2 = System.currentTimeMillis();
 +            if (log.isTraceEnabled())
 +              log.trace("sent " + String.format("%,d", count) + " mutations 
to " + location + " in "
 +                  + String.format("%.2f secs (%,.2f mutations/sec) with %,d 
failures", (st2 - st1) / 1000.0, count / ((st2 - st1) / 1000.0), 
failures.size()));
 +            
 +            long successBytes = 0;
 +            for (Entry<KeyExtent,List<Mutation>> entry : 
mutationBatch.entrySet()) {
 +              for (Mutation mutation : entry.getValue()) {
 +                successBytes += mutation.estimatedMemoryUsed();
 +              }
 +            }
 +            
 +            if (failures.size() > 0) {
 +              failedMutations.add(failures);
 +              successBytes -= failures.getMemoryUsed();
 +            }
 +            
 +            updateSendStats(count, st2 - st1);
 +            decrementMemUsed(successBytes);
 +            
 +          } finally {
 +            span.stop();
 +          }
 +        } catch (IOException e) {
 +          if (log.isTraceEnabled())
 +            log.trace("failed to send mutations to " + location + " : " + 
e.getMessage());
 +          
 +          HashSet<String> tables = new HashSet<String>();
 +          for (KeyExtent ke : mutationBatch.keySet())
 +            tables.add(ke.getTableId().toString());
 +          
 +          for (String table : tables)
 +            TabletLocator.getInstance(instance, new 
Text(table)).invalidateCache(location);
 +          
 +          failedMutations.add(location, tsm);
 +        } finally {
 +          Thread.currentThread().setName(oldName);
 +        }
 +      }
 +    }
 +    
 +    private MutationSet sendMutationsToTabletServer(String location, 
Map<KeyExtent,List<Mutation>> tabMuts, TimeoutTracker timeoutTracker) throws 
IOException,
 +        AccumuloSecurityException, AccumuloServerException {
 +      if (tabMuts.size() == 0) {
 +        return new MutationSet();
 +      }
 +      TInfo tinfo = Tracer.traceInfo();
 +      TTransport transport = null;
 +      
 +      timeoutTracker.startingWrite();
 +      
 +      try {
 +        TabletClientService.Iface client;
 +        
 +        if (timeoutTracker.getTimeOut() < 
instance.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
 +          client = ThriftUtil.getTServerClient(location, 
instance.getConfiguration(), timeoutTracker.getTimeOut());
 +        else
 +          client = ThriftUtil.getTServerClient(location, 
instance.getConfiguration());
 +        
 +        try {
 +          MutationSet allFailures = new MutationSet();
 +          
 +          if (tabMuts.size() == 1 && 
tabMuts.values().iterator().next().size() == 1) {
 +            Entry<KeyExtent,List<Mutation>> entry = 
tabMuts.entrySet().iterator().next();
 +            
 +            try {
 +              client.update(tinfo, credentials, entry.getKey().toThrift(), 
entry.getValue().get(0).toThrift());
 +            } catch (NotServingTabletException e) {
 +              allFailures.addAll(entry.getKey().getTableId().toString(), 
entry.getValue());
 +              TabletLocator.getInstance(instance, new 
Text(entry.getKey().getTableId())).invalidateCache(entry.getKey());
 +            } catch (ConstraintViolationException e) {
 +              
updatedConstraintViolations(Translator.translate(e.violationSummaries, 
Translator.TCVST));
 +            }
 +            timeoutTracker.madeProgress();
 +          } else {
 +            
 +            long usid = client.startUpdate(tinfo, credentials);
 +            
 +            List<TMutation> updates = new ArrayList<TMutation>();
 +            for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) {
 +              long size = 0;
 +              Iterator<Mutation> iter = entry.getValue().iterator();
 +              while (iter.hasNext()) {
 +                while (size < MUTATION_BATCH_SIZE && iter.hasNext()) {
 +                  Mutation mutation = iter.next();
 +                  updates.add(mutation.toThrift());
 +                  size += mutation.numBytes();
 +                }
 +                
 +                client.applyUpdates(tinfo, usid, entry.getKey().toThrift(), 
updates);
 +                updates.clear();
 +                size = 0;
 +              }
 +            }
 +            
 +            UpdateErrors updateErrors = client.closeUpdate(tinfo, usid);
 +            
 +            Map<KeyExtent,Long> failures = 
Translator.translate(updateErrors.failedExtents, Translator.TKET);
 +            
updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries,
 Translator.TCVST));
 +            
updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures,
 Translator.TKET));
 +            
 +            long totalCommitted = 0;
 +            
 +            for (Entry<KeyExtent,Long> entry : failures.entrySet()) {
 +              KeyExtent failedExtent = entry.getKey();
 +              int numCommitted = (int) (long) entry.getValue();
 +              totalCommitted += numCommitted;
 +              
 +              String table = failedExtent.getTableId().toString();
 +              
 +              TabletLocator.getInstance(instance, new 
Text(table)).invalidateCache(failedExtent);
 +              
 +              ArrayList<Mutation> mutations = (ArrayList<Mutation>) 
tabMuts.get(failedExtent);
 +              allFailures.addAll(table, mutations.subList(numCommitted, 
mutations.size()));
 +            }
 +            
 +            if (failures.keySet().containsAll(tabMuts.keySet()) && 
totalCommitted == 0) {
 +              // nothing was successfully written
 +              timeoutTracker.wroteNothing();
 +            } else {
 +              // successfully wrote something to tablet server
 +              timeoutTracker.madeProgress();
 +            }
 +          }
 +          return allFailures;
 +        } finally {
 +          ThriftUtil.returnClient((TServiceClient) client);
 +        }
 +      } catch (TTransportException e) {
 +        timeoutTracker.errorOccured(e);
 +        throw new IOException(e);
 +      } catch (TApplicationException tae) {
 +        updateServerErrors(location, tae);
 +        throw new AccumuloServerException(location, tae);
 +      } catch (ThriftSecurityException e) {
 +        updateAuthorizationFailures(tabMuts.keySet(), e.code);
 +        throw new AccumuloSecurityException(e.user, e.code, e);
 +      } catch (NoSuchScanIDException e) {
 +        throw new IOException(e);
 +      } catch (TException e) {
 +        throw new IOException(e);
 +      } finally {
 +        ThriftTransportPool.getInstance().returnTransport(transport);
 +      }
 +    }
 +  }
 +  
 +  // END code for sending mutations to tablet servers using background threads
 +  
 +  private static class MutationSet {
 +    
 +    private HashMap<String,List<Mutation>> mutations;
 +    private int memoryUsed = 0;
 +    
 +    MutationSet() {
 +      mutations = new HashMap<String,List<Mutation>>();
 +    }
 +    
 +    void addMutation(String table, Mutation mutation) {
 +      List<Mutation> tabMutList = mutations.get(table);
 +      if (tabMutList == null) {
 +        tabMutList = new ArrayList<Mutation>();
 +        mutations.put(table, tabMutList);
 +      }
 +      
 +      tabMutList.add(mutation);
 +      
 +      memoryUsed += mutation.estimatedMemoryUsed();
 +    }
 +    
 +    Map<String,List<Mutation>> getMutations() {
 +      return mutations;
 +    }
 +    
 +    int size() {
 +      int result = 0;
 +      for (List<Mutation> perTable : mutations.values()) {
 +        result += perTable.size();
 +      }
 +      return result;
 +    }
 +    
 +    public void addAll(MutationSet failures) {
 +      Set<Entry<String,List<Mutation>>> es = 
failures.getMutations().entrySet();
 +      
 +      for (Entry<String,List<Mutation>> entry : es) {
 +        String table = entry.getKey();
 +        
 +        for (Mutation mutation : entry.getValue()) {
 +          addMutation(table, mutation);
 +        }
 +      }
 +    }
 +    
 +    public void addAll(String table, List<Mutation> mutations) {
 +      for (Mutation mutation : mutations) {
 +        addMutation(table, mutation);
 +      }
 +    }
 +    
 +    public int getMemoryUsed() {
 +      return memoryUsed;
 +    }
 +    
 +  }
 +}

Reply via email to