Repository: accumulo Updated Branches: refs/heads/master ab0378024 -> 10a100680
ACCUMULO-2990 minor cleanup of TSBW: sprinkled "final" and fixed security error message Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/10a10068 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/10a10068 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/10a10068 Branch: refs/heads/master Commit: 10a100680ba873a6618d932e44dbe1c1b2173052 Parents: 268c8bc Author: Eric C. Newton <eric.new...@gmail.com> Authored: Fri Jul 11 16:54:03 2014 -0400 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Fri Jul 11 16:54:24 2014 -0400 ---------------------------------------------------------------------- .../core/client/impl/BatchWriterImpl.java | 4 +- .../client/impl/TabletServerBatchWriter.java | 100 ++++++++----------- 2 files changed, 46 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/10a10068/core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java index ac41af9..bd76a50 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java @@ -26,8 +26,8 @@ import org.apache.accumulo.core.security.Credentials; public class BatchWriterImpl implements BatchWriter { - private String table; - private TabletServerBatchWriter bw; + private final String table; + private final TabletServerBatchWriter bw; public BatchWriterImpl(Instance instance, Credentials credentials, String table, BatchWriterConfig config) { checkArgument(instance != null, "instance is null"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/10a10068/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java index 76db4be..f2dd980 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java @@ -55,7 +55,6 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.thrift.TMutation; import org.apache.accumulo.core.data.thrift.UpdateErrors; import org.apache.accumulo.core.master.state.tables.TableState; -import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; @@ -72,7 +71,6 @@ import org.apache.log4j.Logger; import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; import org.apache.thrift.TServiceClient; -import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; /* @@ -102,36 +100,34 @@ public class TabletServerBatchWriter { private static final Logger log = Logger.getLogger(TabletServerBatchWriter.class); - private long totalMemUsed = 0; - private long maxMem; - private MutationSet mutations; + // basic configuration + private final Instance instance; + private final Credentials credentials; + private final long maxMem; + private final long maxLatency; + private final long timeout; + + // state private boolean flushing; private boolean closed; - private MutationWriter writer; - private FailedMutations failedMutations; - - private Instance instance; - private Credentials credentials; - - private Violations violations; - private Map<KeyExtent,Set<SecurityErrorCode>> authorizationFailures; - private HashSet<String> serverSideErrors; - private int unknownErrors = 0; - private boolean somethingFailed = false; - - private Timer jtimer; - - private long maxLatency; + private MutationSet mutations; - private long timeout; + // background writer + private final MutationWriter writer; + // latency timers + private final Timer jtimer = new Timer("BatchWriterLatencyTimer", true); + private final Map<String,TimeoutTracker> timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchWriter.TimeoutTracker>()); + + // stats + private long totalMemUsed = 0; private long lastProcessingStartTime; private long totalAdded = 0; - private AtomicLong totalSent = new AtomicLong(0); - private AtomicLong totalBinned = new AtomicLong(0); - private AtomicLong totalBinTime = new AtomicLong(0); - private AtomicLong totalSendTime = new AtomicLong(0); + private final AtomicLong totalSent = new AtomicLong(0); + private final AtomicLong totalBinned = new AtomicLong(0); + private final AtomicLong totalBinTime = new AtomicLong(0); + private final AtomicLong totalSendTime = new AtomicLong(0); private long startTime = 0; private long initialGCTimes; private long initialCompileTimes; @@ -145,14 +141,19 @@ public class TabletServerBatchWriter { private int minTabletServersBatch = Integer.MAX_VALUE; private int maxTabletServersBatch = Integer.MIN_VALUE; + // error handling + private final Violations violations = new Violations(); + private final Map<KeyExtent,Set<SecurityErrorCode>> authorizationFailures = new HashMap<KeyExtent,Set<SecurityErrorCode>>(); + private final HashSet<String> serverSideErrors = new HashSet<String>(); + private final FailedMutations failedMutations = new FailedMutations(); + private int unknownErrors = 0; + private boolean somethingFailed = false; private Throwable lastUnknownError = null; - private Map<String,TimeoutTracker> timeoutTrackers; - private static class TimeoutTracker { - String server; - long timeOut; + final String server; + final long timeOut; long activityTime; Long firstErrorTime = null; @@ -193,21 +194,10 @@ public class TabletServerBatchWriter { this.maxLatency = config.getMaxLatency(TimeUnit.MILLISECONDS) <= 0 ? Long.MAX_VALUE : config.getMaxLatency(TimeUnit.MILLISECONDS); this.credentials = credentials; this.timeout = config.getTimeout(TimeUnit.MILLISECONDS); - mutations = new MutationSet(); - - violations = new Violations(); - - authorizationFailures = new HashMap<KeyExtent,Set<SecurityErrorCode>>(); - serverSideErrors = new HashSet<String>(); + this.mutations = new MutationSet(); + this.lastProcessingStartTime = System.currentTimeMillis(); - lastProcessingStartTime = System.currentTimeMillis(); - - jtimer = new Timer("BatchWriterLatencyTimer", true); - - writer = new MutationWriter(config.getMaxWriteThreads()); - failedMutations = new FailedMutations(); - - timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchWriter.TimeoutTracker>()); + this.writer = new MutationWriter(config.getMaxWriteThreads()); if (this.maxLatency != Long.MAX_VALUE) { jtimer.schedule(new TimerTask() { @@ -331,6 +321,7 @@ public class TabletServerBatchWriter { checkForFailures(); } finally { span.stop(); + // somethingFailed = false; } } @@ -616,10 +607,10 @@ public class TabletServerBatchWriter { private class MutationWriter { private static final int MUTATION_BATCH_SIZE = 1 << 17; - private ExecutorService sendThreadPool; - private Map<String,TabletServerMutations<Mutation>> serversMutations; - private Set<String> queued; - private Map<String,TabletLocator> locators; + private final ExecutorService sendThreadPool; + private final Map<String,TabletServerMutations<Mutation>> serversMutations; + private final Set<String> queued; + private final Map<String,TabletLocator> locators; public MutationWriter(int numSendThreads) { serversMutations = new HashMap<String,TabletServerMutations<Mutation>>(); @@ -640,10 +631,12 @@ public class TabletServerBatchWriter { } private void binMutations(MutationSet mutationsToProcess, Map<String,TabletServerMutations<Mutation>> binnedMutations) { + String tableId = null; try { Set<Entry<String,List<Mutation>>> es = mutationsToProcess.getMutations().entrySet(); for (Entry<String,List<Mutation>> entry : es) { - TabletLocator locator = getLocator(entry.getKey()); + tableId = entry.getKey(); + TabletLocator locator = getLocator(tableId); String table = entry.getKey(); List<Mutation> tableMutations = entry.getValue(); @@ -671,7 +664,7 @@ public class TabletServerBatchWriter { // assume an IOError communicating with metadata tablet failedMutations.add(mutationsToProcess); } catch (AccumuloSecurityException e) { - updateAuthorizationFailures(Collections.singletonMap(new KeyExtent(new Text(MetadataTable.ID), null, null), + updateAuthorizationFailures(Collections.singletonMap(new KeyExtent(new Text(tableId), null, null), SecurityErrorCode.valueOf(e.getSecurityErrorCode().name()))); } catch (TableDeletedException e) { updateUnknownErrors(e.getMessage(), e); @@ -750,7 +743,7 @@ public class TabletServerBatchWriter { class SendTask implements Runnable { - private String location; + final private String location; SendTask(String server) { this.location = server; @@ -847,9 +840,6 @@ public class TabletServerBatchWriter { } TInfo tinfo = Tracer.traceInfo(); - // TODO remove this - TTransport transport = null; - timeoutTracker.startingWrite(); try { @@ -942,8 +932,6 @@ public class TabletServerBatchWriter { throw new IOException(e); } catch (TException e) { throw new IOException(e); - } finally { - ThriftTransportPool.getInstance().returnTransport(transport); } } } @@ -952,7 +940,7 @@ public class TabletServerBatchWriter { private static class MutationSet { - private HashMap<String,List<Mutation>> mutations; + private final HashMap<String,List<Mutation>> mutations; private int memoryUsed = 0; MutationSet() {