Merge remote-tracking branch 'origin/1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT Conflicts: README bin/config.sh docs/combiners.html docs/config.html pom.xml src/core/pom.xml src/core/src/main/java/org/apache/accumulo/core/Constants.java src/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java src/examples/pom.xml src/examples/simple/pom.xml src/examples/wikisearch/ingest/bin/ingest.sh src/examples/wikisearch/ingest/bin/ingest_parallel.sh src/examples/wikisearch/ingest/pom.xml src/examples/wikisearch/pom.xml src/examples/wikisearch/query-war/pom.xml src/examples/wikisearch/query/pom.xml src/minicluster/pom.xml src/proxy/pom.xml src/server/pom.xml src/server/src/main/c++/mlock/Makefile src/server/src/main/c++/nativeMap/Makefile src/server/src/main/java/org/apache/accumulo/server/logger/LogService.java src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java src/server/src/main/resources/config.html src/start/pom.xml src/trace/pom.xml
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/710c6234 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/710c6234 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/710c6234 Branch: refs/heads/master Commit: 710c6234b494f57c9f956abd525f26fb2413246b Parents: 9e15d71 a7aeb88 Author: Eric Newton <e...@apache.org> Authored: Tue Sep 3 08:57:07 2013 -0400 Committer: Eric Newton <e...@apache.org> Committed: Tue Sep 3 08:57:07 2013 -0400 ---------------------------------------------------------------------- README | 1 - .../apache/accumulo/core/client/impl/TabletServerBatchWriter.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/710c6234/README ---------------------------------------------------------------------- diff --cc README index 445af37,ebe89e4..2e05aec --- a/README +++ b/README @@@ -76,25 -63,9 +76,24 @@@ the machines in the cluster and that ha found in the same location on every machine in the cluster. You will need to have password-less ssh set up as described in the hadoop documentation. -You will need to have hadoop installed and configured on your system. -Accumulo 1.4.5-SNAPSHOT has been tested with hadoop version 0.20.2. +You will need to have hadoop installed and configured on your system. Accumulo +1.5.0 has been tested with hadoop version 1.0.4. To avoid data loss, +you must enable HDFS durable sync. How you enable this depends on your version +of Hadoop. Please consult the table below for information regarding your version. +If you need to set the coniguration, please be sure to restart HDFS. See +ACCUMULO-623 for more information. + +HADOOP RELEASE VERSION SYNC NAME DEFAULT +Apache Hadoop 0.20.205 dfs.support.append false +Apache Hadoop 0.23.x dfs.support.append true +Apache Hadoop 1.0.x dfs.support.append false +Apache Hadoop 1.1.x dfs.durable.sync true +Apache Hadoop 2.0.0-2.0.4 dfs.support.append true +Cloudera CDH 3u0-3u3 ???? true +Cloudera CDH 3u4 dfs.support.append true +Hortonworks HDP `1.0 dfs.support.append false +Hortonworks HDP `1.1 dfs.support.append false - The example accumulo configuration files are placed in directories based on the memory footprint for the accumulo processes. If you are using native libraries for you tablet server in-memory map, then you can use the files in "native-standalone". http://git-wip-us.apache.org/repos/asf/accumulo/blob/710c6234/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java index 14bff8c,0000000..ca182a6 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java @@@ -1,1009 -1,0 +1,1009 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.impl; + +import java.io.IOException; +import java.lang.management.CompilationMXBean; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableDeletedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.TableOfflineException; +import org.apache.accumulo.core.client.TimedOutException; +import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations; +import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.constraints.Violations; +import org.apache.accumulo.core.data.ConstraintViolationSummary; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.thrift.TMutation; +import org.apache.accumulo.core.data.thrift.UpdateErrors; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException; +import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; +import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.util.SimpleThreadPool; +import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.accumulo.trace.instrument.Span; +import org.apache.accumulo.trace.instrument.Trace; +import org.apache.accumulo.trace.instrument.Tracer; +import org.apache.accumulo.trace.thrift.TInfo; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.thrift.TApplicationException; +import org.apache.thrift.TException; +import org.apache.thrift.TServiceClient; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +/* + * Differences from previous TabletServerBatchWriter + * + As background threads finish sending mutations to tablet servers they decrement memory usage + * + Once the queue of unprocessed mutations reaches 50% it is always pushed to the background threads, + * even if they are currently processing... new mutations are merged with mutations currently + * processing in the background + * + Failed mutations are held for 1000ms and then re-added to the unprocessed queue + * + Flush holds adding of new mutations so it does not wait indefinitely + * + * Considerations + * + All background threads must catch and note Throwable + * + mutations for a single tablet server are only processed by one thread concurrently (if new mutations + * come in for a tablet server while one thread is processing mutations for it, no other thread should + * start processing those mutations) + * + * Memory accounting + * + when a mutation enters the system memory is incremented + * + when a mutation successfully leaves the system memory is decremented + * + * + * + */ + +public class TabletServerBatchWriter { + + private static final Logger log = Logger.getLogger(TabletServerBatchWriter.class); + + private long totalMemUsed = 0; + private long maxMem; + private MutationSet mutations; + private boolean flushing; + private boolean closed; + private MutationWriter writer; + private FailedMutations failedMutations; + + private Instance instance; + private TCredentials credentials; + + private Violations violations; + private Map<KeyExtent,Set<SecurityErrorCode>> authorizationFailures; + private HashSet<String> serverSideErrors; + private int unknownErrors = 0; + private boolean somethingFailed = false; + + private Timer jtimer; + + private long maxLatency; + + private long timeout; + + private long lastProcessingStartTime; + + private long totalAdded = 0; + private AtomicLong totalSent = new AtomicLong(0); + private AtomicLong totalBinned = new AtomicLong(0); + private AtomicLong totalBinTime = new AtomicLong(0); + private AtomicLong totalSendTime = new AtomicLong(0); + private long startTime = 0; + private long initialGCTimes; + private long initialCompileTimes; + private double initialSystemLoad; + + private int tabletServersBatchSum = 0; + private int tabletBatchSum = 0; + private int numBatches = 0; + private int maxTabletBatch = Integer.MIN_VALUE; + private int minTabletBatch = Integer.MAX_VALUE; + private int minTabletServersBatch = Integer.MAX_VALUE; + private int maxTabletServersBatch = Integer.MIN_VALUE; + + private Throwable lastUnknownError = null; + + private Map<String,TimeoutTracker> timeoutTrackers; + + private static class TimeoutTracker { + + String server; + long timeOut; + long activityTime; + Long firstErrorTime = null; + + TimeoutTracker(String server, long timeOut) { + this.timeOut = timeOut; + this.server = server; + } + + void startingWrite() { + activityTime = System.currentTimeMillis(); + } + + void madeProgress() { + activityTime = System.currentTimeMillis(); + firstErrorTime = null; + } + + void wroteNothing() { + if (firstErrorTime == null) { + firstErrorTime = activityTime; + } else if (System.currentTimeMillis() - firstErrorTime > timeOut) { + throw new TimedOutException(Collections.singleton(server)); + } + } + + void errorOccured(Exception e) { + wroteNothing(); + } + + public long getTimeOut() { + return timeOut; + } + } + + public TabletServerBatchWriter(Instance instance, TCredentials credentials, BatchWriterConfig config) { + this.instance = instance; + this.maxMem = config.getMaxMemory(); + this.maxLatency = config.getMaxLatency(TimeUnit.MILLISECONDS) <= 0 ? Long.MAX_VALUE : config.getMaxLatency(TimeUnit.MILLISECONDS); + this.credentials = credentials; + this.timeout = config.getTimeout(TimeUnit.MILLISECONDS); + mutations = new MutationSet(); + + violations = new Violations(); + + authorizationFailures = new HashMap<KeyExtent,Set<SecurityErrorCode>>(); + serverSideErrors = new HashSet<String>(); + + lastProcessingStartTime = System.currentTimeMillis(); + + jtimer = new Timer("BatchWriterLatencyTimer", true); + + writer = new MutationWriter(config.getMaxWriteThreads()); + failedMutations = new FailedMutations(); + + timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchWriter.TimeoutTracker>()); + + if (this.maxLatency != Long.MAX_VALUE) { + jtimer.schedule(new TimerTask() { + public void run() { + try { + synchronized (TabletServerBatchWriter.this) { + if ((System.currentTimeMillis() - lastProcessingStartTime) > TabletServerBatchWriter.this.maxLatency) + startProcessing(); + } + } catch (Throwable t) { + updateUnknownErrors("Max latency task failed " + t.getMessage(), t); + } + } + }, 0, this.maxLatency / 4); + } + } + + private synchronized void startProcessing() { + if (mutations.getMemoryUsed() == 0) + return; + lastProcessingStartTime = System.currentTimeMillis(); + writer.addMutations(mutations); + mutations = new MutationSet(); + } + + private synchronized void decrementMemUsed(long amount) { + totalMemUsed -= amount; + this.notifyAll(); + } + + public synchronized void addMutation(String table, Mutation m) throws MutationsRejectedException { + + if (closed) + throw new IllegalStateException("Closed"); + if (m.size() == 0) + throw new IllegalArgumentException("Can not add empty mutations"); + + checkForFailures(); + - while ((totalMemUsed >= maxMem || flushing) && !somethingFailed) { ++ while ((totalMemUsed > maxMem || flushing) && !somethingFailed) { + waitRTE(); + } + + // do checks again since things could have changed while waiting and not holding lock + if (closed) + throw new IllegalStateException("Closed"); + checkForFailures(); + + if (startTime == 0) { + startTime = System.currentTimeMillis(); + + List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans(); + for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) { + initialGCTimes += garbageCollectorMXBean.getCollectionTime(); + } + + CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean(); + if (compMxBean.isCompilationTimeMonitoringSupported()) { + initialCompileTimes = compMxBean.getTotalCompilationTime(); + } + + initialSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage(); + } + + // create a copy of mutation so that after this method returns the user + // is free to reuse the mutation object, like calling readFields... this + // is important for the case where a mutation is passed from map to reduce + // to batch writer... the map reduce code will keep passing the same mutation + // object into the reduce method + m = new Mutation(m); + + totalMemUsed += m.estimatedMemoryUsed(); + mutations.addMutation(table, m); + totalAdded++; + + if (mutations.getMemoryUsed() >= maxMem / 2) { + startProcessing(); + checkForFailures(); + } + } + + public void addMutation(String table, Iterator<Mutation> iterator) throws MutationsRejectedException { + while (iterator.hasNext()) { + addMutation(table, iterator.next()); + } + } + + public synchronized void flush() throws MutationsRejectedException { + + if (closed) + throw new IllegalStateException("Closed"); + + Span span = Trace.start("flush"); + + try { + checkForFailures(); + + if (flushing) { + // some other thread is currently flushing, so wait + while (flushing && !somethingFailed) + waitRTE(); + + checkForFailures(); + + return; + } + + flushing = true; + + startProcessing(); + checkForFailures(); + + while (totalMemUsed > 0 && !somethingFailed) { + waitRTE(); + } + + flushing = false; + this.notifyAll(); + + checkForFailures(); + } finally { + span.stop(); + } + } + + public synchronized void close() throws MutationsRejectedException { + + if (closed) + return; + + Span span = Trace.start("close"); + try { + closed = true; + + startProcessing(); + + while (totalMemUsed > 0 && !somethingFailed) { + waitRTE(); + } + + logStats(); + + checkForFailures(); + } finally { + // make a best effort to release these resources + writer.sendThreadPool.shutdownNow(); + jtimer.cancel(); + span.stop(); + } + } + + private void logStats() { + long finishTime = System.currentTimeMillis(); + + long finalGCTimes = 0; + List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans(); + for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) { + finalGCTimes += garbageCollectorMXBean.getCollectionTime(); + } + + CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean(); + long finalCompileTimes = 0; + if (compMxBean.isCompilationTimeMonitoringSupported()) { + finalCompileTimes = compMxBean.getTotalCompilationTime(); + } + + double averageRate = totalSent.get() / (totalSendTime.get() / 1000.0); + double overallRate = totalAdded / ((finishTime - startTime) / 1000.0); + + double finalSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage(); + + if (log.isTraceEnabled()) { + log.trace(""); + log.trace("TABLET SERVER BATCH WRITER STATISTICS"); + log.trace(String.format("Added : %,10d mutations", totalAdded)); + log.trace(String.format("Sent : %,10d mutations", totalSent.get())); + log.trace(String.format("Resent percentage : %10.2f%s", (totalSent.get() - totalAdded) / (double) totalAdded * 100.0, "%")); + log.trace(String.format("Overall time : %,10.2f secs", (finishTime - startTime) / 1000.0)); + log.trace(String.format("Overall send rate : %,10.2f mutations/sec", overallRate)); + log.trace(String.format("Send efficiency : %10.2f%s", overallRate / averageRate * 100.0, "%")); + log.trace(""); + log.trace("BACKGROUND WRITER PROCESS STATISTICS"); + log.trace(String.format("Total send time : %,10.2f secs %6.2f%s", totalSendTime.get() / 1000.0, 100.0 * totalSendTime.get() + / (finishTime - startTime), "%")); + log.trace(String.format("Average send rate : %,10.2f mutations/sec", averageRate)); + log.trace(String.format("Total bin time : %,10.2f secs %6.2f%s", totalBinTime.get() / 1000.0, + 100.0 * totalBinTime.get() / (finishTime - startTime), "%")); + log.trace(String.format("Average bin rate : %,10.2f mutations/sec", totalBinned.get() / (totalBinTime.get() / 1000.0))); + log.trace(String.format("tservers per batch : %,8.2f avg %,6d min %,6d max", (tabletServersBatchSum / (double) numBatches), minTabletServersBatch, + maxTabletServersBatch)); + log.trace(String.format("tablets per batch : %,8.2f avg %,6d min %,6d max", (tabletBatchSum / (double) numBatches), minTabletBatch, maxTabletBatch)); + log.trace(""); + log.trace("SYSTEM STATISTICS"); + log.trace(String.format("JVM GC Time : %,10.2f secs", ((finalGCTimes - initialGCTimes) / 1000.0))); + if (compMxBean.isCompilationTimeMonitoringSupported()) { + log.trace(String.format("JVM Compile Time : %,10.2f secs", ((finalCompileTimes - initialCompileTimes) / 1000.0))); + } + log.trace(String.format("System load average : initial=%6.2f final=%6.2f", initialSystemLoad, finalSystemLoad)); + } + } + + private void updateSendStats(long count, long time) { + totalSent.addAndGet(count); + totalSendTime.addAndGet(time); + } + + public void updateBinningStats(int count, long time, Map<String,TabletServerMutations> binnedMutations) { + totalBinTime.addAndGet(time); + totalBinned.addAndGet(count); + updateBatchStats(binnedMutations); + } + + private synchronized void updateBatchStats(Map<String,TabletServerMutations> binnedMutations) { + tabletServersBatchSum += binnedMutations.size(); + + minTabletServersBatch = Math.min(minTabletServersBatch, binnedMutations.size()); + maxTabletServersBatch = Math.max(maxTabletServersBatch, binnedMutations.size()); + + int numTablets = 0; + + for (Entry<String,TabletServerMutations> entry : binnedMutations.entrySet()) { + TabletServerMutations tsm = entry.getValue(); + numTablets += tsm.getMutations().size(); + } + + tabletBatchSum += numTablets; + + minTabletBatch = Math.min(minTabletBatch, numTablets); + maxTabletBatch = Math.max(maxTabletBatch, numTablets); + + numBatches++; + } + + private void waitRTE() { + try { + wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + // BEGIN code for handling unrecoverable errors + + private void updatedConstraintViolations(List<ConstraintViolationSummary> cvsList) { + if (cvsList.size() > 0) { + synchronized (this) { + somethingFailed = true; + violations.add(cvsList); + this.notifyAll(); + } + } + } + + private void updateAuthorizationFailures(Set<KeyExtent> keySet, SecurityErrorCode code) { + HashMap<KeyExtent, SecurityErrorCode> map = new HashMap<KeyExtent, SecurityErrorCode>(); + for (KeyExtent ke : keySet) + map.put(ke, code); + + updateAuthorizationFailures(map); + } + + private void updateAuthorizationFailures(Map<KeyExtent,SecurityErrorCode> authorizationFailures) { + if (authorizationFailures.size() > 0) { + + // was a table deleted? + HashSet<String> tableIds = new HashSet<String>(); + for (KeyExtent ke : authorizationFailures.keySet()) + tableIds.add(ke.getTableId().toString()); + + Tables.clearCache(instance); + for (String tableId : tableIds) + if (!Tables.exists(instance, tableId)) + throw new TableDeletedException(tableId); + + synchronized (this) { + somethingFailed = true; + mergeAuthorizationFailures(this.authorizationFailures, authorizationFailures); + this.notifyAll(); + } + } + } + + private void mergeAuthorizationFailures(Map<KeyExtent,Set<SecurityErrorCode>> source, Map<KeyExtent,SecurityErrorCode> addition) { + for (Entry<KeyExtent,SecurityErrorCode> entry : addition.entrySet()) { + Set<SecurityErrorCode> secs = source.get(entry.getKey()); + if (secs == null) { + secs = new HashSet<SecurityErrorCode>(); + source.put(entry.getKey(), secs); + } + secs.add(entry.getValue()); + } + } + + private synchronized void updateServerErrors(String server, Exception e) { + somethingFailed = true; + this.serverSideErrors.add(server); + this.notifyAll(); + log.error("Server side error on " + server); + } + + private synchronized void updateUnknownErrors(String msg, Throwable t) { + somethingFailed = true; + unknownErrors++; + this.lastUnknownError = t; + this.notifyAll(); + if (t instanceof TableDeletedException || t instanceof TableOfflineException || t instanceof TimedOutException) + log.debug(msg, t); // this is not unknown + else + log.error(msg, t); + } + + private void checkForFailures() throws MutationsRejectedException { + if (somethingFailed) { + List<ConstraintViolationSummary> cvsList = violations.asList(); + HashMap<KeyExtent,Set<org.apache.accumulo.core.client.security.SecurityErrorCode>> af = new HashMap<KeyExtent,Set<org.apache.accumulo.core.client.security.SecurityErrorCode>>(); + for (Entry<KeyExtent,Set<SecurityErrorCode>> entry : authorizationFailures.entrySet()) { + HashSet<org.apache.accumulo.core.client.security.SecurityErrorCode> codes = new HashSet<org.apache.accumulo.core.client.security.SecurityErrorCode>(); + + for (SecurityErrorCode sce : entry.getValue()) { + codes.add(org.apache.accumulo.core.client.security.SecurityErrorCode.valueOf(sce.name())); + } + + af.put(entry.getKey(), codes); + } + + throw new MutationsRejectedException(cvsList, af, serverSideErrors, unknownErrors, lastUnknownError); + } + } + + // END code for handling unrecoverable errors + + // BEGIN code for handling failed mutations + + /** + * Add mutations that previously failed back into the mix + * + * @param mutationsprivate + * static final Logger log = Logger.getLogger(TabletServerBatchWriter.class); + */ + private synchronized void addFailedMutations(MutationSet failedMutations) throws Exception { + mutations.addAll(failedMutations); + if (mutations.getMemoryUsed() >= maxMem / 2 || closed || flushing) { + startProcessing(); + } + } + + private class FailedMutations extends TimerTask { + + private MutationSet recentFailures = null; + private long initTime; + + FailedMutations() { + jtimer.schedule(this, 0, 500); + } + + private MutationSet init() { + if (recentFailures == null) { + recentFailures = new MutationSet(); + initTime = System.currentTimeMillis(); + } + return recentFailures; + } + + synchronized void add(String table, ArrayList<Mutation> tableFailures) { + init().addAll(table, tableFailures); + } + + synchronized void add(MutationSet failures) { + init().addAll(failures); + } + + synchronized void add(String location, TabletServerMutations tsm) { + init(); + for (Entry<KeyExtent,List<Mutation>> entry : tsm.getMutations().entrySet()) { + recentFailures.addAll(entry.getKey().getTableId().toString(), entry.getValue()); + } + + } + + @Override + public void run() { + try { + MutationSet rf = null; + + synchronized (this) { + if (recentFailures != null && System.currentTimeMillis() - initTime > 1000) { + rf = recentFailures; + recentFailures = null; + } + } + + if (rf != null) { + if (log.isTraceEnabled()) + log.trace("requeuing " + rf.size() + " failed mutations"); + addFailedMutations(rf); + } + } catch (Throwable t) { + updateUnknownErrors("Failed to requeue failed mutations " + t.getMessage(), t); + cancel(); + } + } + } + + // END code for handling failed mutations + + // BEGIN code for sending mutations to tablet servers using background threads + + private class MutationWriter { + + private static final int MUTATION_BATCH_SIZE = 1 << 17; + private ExecutorService sendThreadPool; + private Map<String,TabletServerMutations> serversMutations; + private Set<String> queued; + private Map<String,TabletLocator> locators; + + public MutationWriter(int numSendThreads) { + serversMutations = new HashMap<String,TabletServerMutations>(); + queued = new HashSet<String>(); + sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName()); + locators = new HashMap<String,TabletLocator>(); + } + + private TabletLocator getLocator(String tableId) { + TabletLocator ret = locators.get(tableId); + if (ret == null) { + ret = TabletLocator.getInstance(instance, new Text(tableId)); + ret = new TimeoutTabletLocator(ret, timeout); + locators.put(tableId, ret); + } + + return ret; + } + + private void binMutations(MutationSet mutationsToProcess, Map<String,TabletServerMutations> binnedMutations) { + try { + Set<Entry<String,List<Mutation>>> es = mutationsToProcess.getMutations().entrySet(); + for (Entry<String,List<Mutation>> entry : es) { + TabletLocator locator = getLocator(entry.getKey()); + + String table = entry.getKey(); + List<Mutation> tableMutations = entry.getValue(); + + if (tableMutations != null) { + ArrayList<Mutation> tableFailures = new ArrayList<Mutation>(); + locator.binMutations(tableMutations, binnedMutations, tableFailures, credentials); + + if (tableFailures.size() > 0) { + failedMutations.add(table, tableFailures); + + if (tableFailures.size() == tableMutations.size()) + if (!Tables.exists(instance, entry.getKey())) + throw new TableDeletedException(entry.getKey()); + else if (Tables.getTableState(instance, table) == TableState.OFFLINE) + throw new TableOfflineException(instance, entry.getKey()); + } + } + + } + return; + } catch (AccumuloServerException ase) { + updateServerErrors(ase.getServer(), ase); + } catch (AccumuloException ae) { + // assume an IOError communicating with !METADATA tablet + failedMutations.add(mutationsToProcess); + } catch (AccumuloSecurityException e) { + updateAuthorizationFailures(Collections.singletonMap(new KeyExtent(new Text(Constants.METADATA_TABLE_ID), null, null), + SecurityErrorCode.valueOf(e.getSecurityErrorCode().name()))); + } catch (TableDeletedException e) { + updateUnknownErrors(e.getMessage(), e); + } catch (TableOfflineException e) { + updateUnknownErrors(e.getMessage(), e); + } catch (TableNotFoundException e) { + updateUnknownErrors(e.getMessage(), e); + } + + // an error ocurred + binnedMutations.clear(); + + } + + void addMutations(MutationSet mutationsToSend) { + Map<String,TabletServerMutations> binnedMutations = new HashMap<String,TabletServerMutations>(); + Span span = Trace.start("binMutations"); + try { + long t1 = System.currentTimeMillis(); + binMutations(mutationsToSend, binnedMutations); + long t2 = System.currentTimeMillis(); + updateBinningStats(mutationsToSend.size(), (t2 - t1), binnedMutations); + } finally { + span.stop(); + } + addMutations(binnedMutations); + } + + private synchronized void addMutations(Map<String,TabletServerMutations> binnedMutations) { + + int count = 0; + + // merge mutations into existing mutations for a tablet server + for (Entry<String,TabletServerMutations> entry : binnedMutations.entrySet()) { + String server = entry.getKey(); + + TabletServerMutations currentMutations = serversMutations.get(server); + + if (currentMutations == null) { + serversMutations.put(server, entry.getValue()); + } else { + for (Entry<KeyExtent,List<Mutation>> entry2 : entry.getValue().getMutations().entrySet()) { + for (Mutation m : entry2.getValue()) { + currentMutations.addMutation(entry2.getKey(), m); + } + } + } + + if (log.isTraceEnabled()) + for (Entry<KeyExtent,List<Mutation>> entry2 : entry.getValue().getMutations().entrySet()) + count += entry2.getValue().size(); + + } + + if (count > 0 && log.isTraceEnabled()) + log.trace(String.format("Started sending %,d mutations to %,d tablet servers", count, binnedMutations.keySet().size())); + + // randomize order of servers + ArrayList<String> servers = new ArrayList<String>(binnedMutations.keySet()); + Collections.shuffle(servers); + + for (String server : servers) + if (!queued.contains(server)) { + sendThreadPool.submit(Trace.wrap(new SendTask(server))); + queued.add(server); + } + } + + private synchronized TabletServerMutations getMutationsToSend(String server) { + TabletServerMutations tsmuts = serversMutations.remove(server); + if (tsmuts == null) + queued.remove(server); + + return tsmuts; + } + + class SendTask implements Runnable { + + private String location; + + SendTask(String server) { + this.location = server; + } + + @Override + public void run() { + try { + TabletServerMutations tsmuts = getMutationsToSend(location); + + while (tsmuts != null) { + send(tsmuts); + tsmuts = getMutationsToSend(location); + } + + return; + } catch (Throwable t) { + updateUnknownErrors("Failed to send tablet server " + location + " its batch : " + t.getMessage(), t); + } + } + + public void send(TabletServerMutations tsm) throws AccumuloServerException, AccumuloSecurityException { + + MutationSet failures = null; + + String oldName = Thread.currentThread().getName(); + + Map<KeyExtent,List<Mutation>> mutationBatch = tsm.getMutations(); + try { + + long count = 0; + for (List<Mutation> list : mutationBatch.values()) { + count += list.size(); + } + String msg = "sending " + String.format("%,d", count) + " mutations to " + String.format("%,d", mutationBatch.size()) + " tablets at " + location; + Thread.currentThread().setName(msg); + + Span span = Trace.start("sendMutations"); + try { + + TimeoutTracker timeoutTracker = timeoutTrackers.get(location); + if (timeoutTracker == null) { + timeoutTracker = new TimeoutTracker(location, timeout); + timeoutTrackers.put(location, timeoutTracker); + } + + long st1 = System.currentTimeMillis(); + failures = sendMutationsToTabletServer(location, mutationBatch, timeoutTracker); + long st2 = System.currentTimeMillis(); + if (log.isTraceEnabled()) + log.trace("sent " + String.format("%,d", count) + " mutations to " + location + " in " + + String.format("%.2f secs (%,.2f mutations/sec) with %,d failures", (st2 - st1) / 1000.0, count / ((st2 - st1) / 1000.0), failures.size())); + + long successBytes = 0; + for (Entry<KeyExtent,List<Mutation>> entry : mutationBatch.entrySet()) { + for (Mutation mutation : entry.getValue()) { + successBytes += mutation.estimatedMemoryUsed(); + } + } + + if (failures.size() > 0) { + failedMutations.add(failures); + successBytes -= failures.getMemoryUsed(); + } + + updateSendStats(count, st2 - st1); + decrementMemUsed(successBytes); + + } finally { + span.stop(); + } + } catch (IOException e) { + if (log.isTraceEnabled()) + log.trace("failed to send mutations to " + location + " : " + e.getMessage()); + + HashSet<String> tables = new HashSet<String>(); + for (KeyExtent ke : mutationBatch.keySet()) + tables.add(ke.getTableId().toString()); + + for (String table : tables) + TabletLocator.getInstance(instance, new Text(table)).invalidateCache(location); + + failedMutations.add(location, tsm); + } finally { + Thread.currentThread().setName(oldName); + } + } + } + + private MutationSet sendMutationsToTabletServer(String location, Map<KeyExtent,List<Mutation>> tabMuts, TimeoutTracker timeoutTracker) throws IOException, + AccumuloSecurityException, AccumuloServerException { + if (tabMuts.size() == 0) { + return new MutationSet(); + } + TInfo tinfo = Tracer.traceInfo(); + TTransport transport = null; + + timeoutTracker.startingWrite(); + + try { + TabletClientService.Iface client; + + if (timeoutTracker.getTimeOut() < instance.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)) + client = ThriftUtil.getTServerClient(location, instance.getConfiguration(), timeoutTracker.getTimeOut()); + else + client = ThriftUtil.getTServerClient(location, instance.getConfiguration()); + + try { + MutationSet allFailures = new MutationSet(); + + if (tabMuts.size() == 1 && tabMuts.values().iterator().next().size() == 1) { + Entry<KeyExtent,List<Mutation>> entry = tabMuts.entrySet().iterator().next(); + + try { + client.update(tinfo, credentials, entry.getKey().toThrift(), entry.getValue().get(0).toThrift()); + } catch (NotServingTabletException e) { + allFailures.addAll(entry.getKey().getTableId().toString(), entry.getValue()); + TabletLocator.getInstance(instance, new Text(entry.getKey().getTableId())).invalidateCache(entry.getKey()); + } catch (ConstraintViolationException e) { + updatedConstraintViolations(Translator.translate(e.violationSummaries, Translator.TCVST)); + } + timeoutTracker.madeProgress(); + } else { + + long usid = client.startUpdate(tinfo, credentials); + + List<TMutation> updates = new ArrayList<TMutation>(); + for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) { + long size = 0; + Iterator<Mutation> iter = entry.getValue().iterator(); + while (iter.hasNext()) { + while (size < MUTATION_BATCH_SIZE && iter.hasNext()) { + Mutation mutation = iter.next(); + updates.add(mutation.toThrift()); + size += mutation.numBytes(); + } + + client.applyUpdates(tinfo, usid, entry.getKey().toThrift(), updates); + updates.clear(); + size = 0; + } + } + + UpdateErrors updateErrors = client.closeUpdate(tinfo, usid); + + Map<KeyExtent,Long> failures = Translator.translate(updateErrors.failedExtents, Translator.TKET); + updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries, Translator.TCVST)); + updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures, Translator.TKET)); + + long totalCommitted = 0; + + for (Entry<KeyExtent,Long> entry : failures.entrySet()) { + KeyExtent failedExtent = entry.getKey(); + int numCommitted = (int) (long) entry.getValue(); + totalCommitted += numCommitted; + + String table = failedExtent.getTableId().toString(); + + TabletLocator.getInstance(instance, new Text(table)).invalidateCache(failedExtent); + + ArrayList<Mutation> mutations = (ArrayList<Mutation>) tabMuts.get(failedExtent); + allFailures.addAll(table, mutations.subList(numCommitted, mutations.size())); + } + + if (failures.keySet().containsAll(tabMuts.keySet()) && totalCommitted == 0) { + // nothing was successfully written + timeoutTracker.wroteNothing(); + } else { + // successfully wrote something to tablet server + timeoutTracker.madeProgress(); + } + } + return allFailures; + } finally { + ThriftUtil.returnClient((TServiceClient) client); + } + } catch (TTransportException e) { + timeoutTracker.errorOccured(e); + throw new IOException(e); + } catch (TApplicationException tae) { + updateServerErrors(location, tae); + throw new AccumuloServerException(location, tae); + } catch (ThriftSecurityException e) { + updateAuthorizationFailures(tabMuts.keySet(), e.code); + throw new AccumuloSecurityException(e.user, e.code, e); + } catch (NoSuchScanIDException e) { + throw new IOException(e); + } catch (TException e) { + throw new IOException(e); + } finally { + ThriftTransportPool.getInstance().returnTransport(transport); + } + } + } + + // END code for sending mutations to tablet servers using background threads + + private static class MutationSet { + + private HashMap<String,List<Mutation>> mutations; + private int memoryUsed = 0; + + MutationSet() { + mutations = new HashMap<String,List<Mutation>>(); + } + + void addMutation(String table, Mutation mutation) { + List<Mutation> tabMutList = mutations.get(table); + if (tabMutList == null) { + tabMutList = new ArrayList<Mutation>(); + mutations.put(table, tabMutList); + } + + tabMutList.add(mutation); + + memoryUsed += mutation.estimatedMemoryUsed(); + } + + Map<String,List<Mutation>> getMutations() { + return mutations; + } + + int size() { + int result = 0; + for (List<Mutation> perTable : mutations.values()) { + result += perTable.size(); + } + return result; + } + + public void addAll(MutationSet failures) { + Set<Entry<String,List<Mutation>>> es = failures.getMutations().entrySet(); + + for (Entry<String,List<Mutation>> entry : es) { + String table = entry.getKey(); + + for (Mutation mutation : entry.getValue()) { + addMutation(table, mutation); + } + } + } + + public void addAll(String table, List<Mutation> mutations) { + for (Mutation mutation : mutations) { + addMutation(table, mutation); + } + } + + public int getMemoryUsed() { + return memoryUsed; + } + + } +}