Repository: accumulo
Updated Branches:
  refs/heads/master ab0378024 -> 10a100680


ACCUMULO-2990 minor cleanup of TSBW: sprinkled "final" and fixed security error 
message


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/10a10068
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/10a10068
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/10a10068

Branch: refs/heads/master
Commit: 10a100680ba873a6618d932e44dbe1c1b2173052
Parents: 268c8bc
Author: Eric C. Newton <eric.new...@gmail.com>
Authored: Fri Jul 11 16:54:03 2014 -0400
Committer: Eric C. Newton <eric.new...@gmail.com>
Committed: Fri Jul 11 16:54:24 2014 -0400

----------------------------------------------------------------------
 .../core/client/impl/BatchWriterImpl.java       |   4 +-
 .../client/impl/TabletServerBatchWriter.java    | 100 ++++++++-----------
 2 files changed, 46 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/10a10068/core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java 
b/core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java
index ac41af9..bd76a50 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java
@@ -26,8 +26,8 @@ import org.apache.accumulo.core.security.Credentials;
 
 public class BatchWriterImpl implements BatchWriter {
   
-  private String table;
-  private TabletServerBatchWriter bw;
+  private final String table;
+  private final TabletServerBatchWriter bw;
   
   public BatchWriterImpl(Instance instance, Credentials credentials, String 
table, BatchWriterConfig config) {
     checkArgument(instance != null, "instance is null");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/10a10068/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
 
b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
index 76db4be..f2dd980 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
@@ -55,7 +55,6 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.thrift.TMutation;
 import org.apache.accumulo.core.data.thrift.UpdateErrors;
 import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.security.Credentials;
 import 
org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
 import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
@@ -72,7 +71,6 @@ import org.apache.log4j.Logger;
 import org.apache.thrift.TApplicationException;
 import org.apache.thrift.TException;
 import org.apache.thrift.TServiceClient;
-import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
 /*
@@ -102,36 +100,34 @@ public class TabletServerBatchWriter {
   
   private static final Logger log = 
Logger.getLogger(TabletServerBatchWriter.class);
   
-  private long totalMemUsed = 0;
-  private long maxMem;
-  private MutationSet mutations;
+  // basic configuration
+  private final Instance instance;
+  private final Credentials credentials;
+  private final long maxMem;
+  private final long maxLatency;
+  private final long timeout;
+  
+  // state
   private boolean flushing;
   private boolean closed;
-  private MutationWriter writer;
-  private FailedMutations failedMutations;
-  
-  private Instance instance;
-  private Credentials credentials;
-  
-  private Violations violations;
-  private Map<KeyExtent,Set<SecurityErrorCode>> authorizationFailures;
-  private HashSet<String> serverSideErrors;
-  private int unknownErrors = 0;
-  private boolean somethingFailed = false;
-  
-  private Timer jtimer;
-  
-  private long maxLatency;
+  private MutationSet mutations;
   
-  private long timeout;
+  // background writer
+  private final MutationWriter writer;
   
+  // latency timers
+  private final Timer jtimer = new Timer("BatchWriterLatencyTimer", true);
+  private final Map<String,TimeoutTracker> timeoutTrackers = 
Collections.synchronizedMap(new 
HashMap<String,TabletServerBatchWriter.TimeoutTracker>());
+
+  // stats
+  private long totalMemUsed = 0;
   private long lastProcessingStartTime;
   
   private long totalAdded = 0;
-  private AtomicLong totalSent = new AtomicLong(0);
-  private AtomicLong totalBinned = new AtomicLong(0);
-  private AtomicLong totalBinTime = new AtomicLong(0);
-  private AtomicLong totalSendTime = new AtomicLong(0);
+  private final AtomicLong totalSent = new AtomicLong(0);
+  private final AtomicLong totalBinned = new AtomicLong(0);
+  private final AtomicLong totalBinTime = new AtomicLong(0);
+  private final AtomicLong totalSendTime = new AtomicLong(0);
   private long startTime = 0;
   private long initialGCTimes;
   private long initialCompileTimes;
@@ -145,14 +141,19 @@ public class TabletServerBatchWriter {
   private int minTabletServersBatch = Integer.MAX_VALUE;
   private int maxTabletServersBatch = Integer.MIN_VALUE;
   
+  // error handling
+  private final Violations violations = new Violations();
+  private final Map<KeyExtent,Set<SecurityErrorCode>> authorizationFailures = 
new HashMap<KeyExtent,Set<SecurityErrorCode>>();
+  private final HashSet<String> serverSideErrors = new HashSet<String>();
+  private final FailedMutations failedMutations = new FailedMutations();
+  private int unknownErrors = 0;
+  private boolean somethingFailed = false;
   private Throwable lastUnknownError = null;
   
-  private Map<String,TimeoutTracker> timeoutTrackers;
-  
   private static class TimeoutTracker {
     
-    String server;
-    long timeOut;
+    final String server;
+    final long timeOut;
     long activityTime;
     Long firstErrorTime = null;
     
@@ -193,21 +194,10 @@ public class TabletServerBatchWriter {
     this.maxLatency = config.getMaxLatency(TimeUnit.MILLISECONDS) <= 0 ? 
Long.MAX_VALUE : config.getMaxLatency(TimeUnit.MILLISECONDS);
     this.credentials = credentials;
     this.timeout = config.getTimeout(TimeUnit.MILLISECONDS);
-    mutations = new MutationSet();
-    
-    violations = new Violations();
-    
-    authorizationFailures = new HashMap<KeyExtent,Set<SecurityErrorCode>>();
-    serverSideErrors = new HashSet<String>();
+    this.mutations = new MutationSet();
+    this.lastProcessingStartTime = System.currentTimeMillis();
     
-    lastProcessingStartTime = System.currentTimeMillis();
-    
-    jtimer = new Timer("BatchWriterLatencyTimer", true);
-    
-    writer = new MutationWriter(config.getMaxWriteThreads());
-    failedMutations = new FailedMutations();
-    
-    timeoutTrackers = Collections.synchronizedMap(new 
HashMap<String,TabletServerBatchWriter.TimeoutTracker>());
+    this.writer = new MutationWriter(config.getMaxWriteThreads());
     
     if (this.maxLatency != Long.MAX_VALUE) {
       jtimer.schedule(new TimerTask() {
@@ -331,6 +321,7 @@ public class TabletServerBatchWriter {
       checkForFailures();
     } finally {
       span.stop();
+      // somethingFailed = false;
     }
   }
   
@@ -616,10 +607,10 @@ public class TabletServerBatchWriter {
   private class MutationWriter {
     
     private static final int MUTATION_BATCH_SIZE = 1 << 17;
-    private ExecutorService sendThreadPool;
-    private Map<String,TabletServerMutations<Mutation>> serversMutations;
-    private Set<String> queued;
-    private Map<String,TabletLocator> locators;
+    private final ExecutorService sendThreadPool;
+    private final Map<String,TabletServerMutations<Mutation>> serversMutations;
+    private final Set<String> queued;
+    private final Map<String,TabletLocator> locators;
     
     public MutationWriter(int numSendThreads) {
       serversMutations = new HashMap<String,TabletServerMutations<Mutation>>();
@@ -640,10 +631,12 @@ public class TabletServerBatchWriter {
     }
     
     private void binMutations(MutationSet mutationsToProcess, 
Map<String,TabletServerMutations<Mutation>> binnedMutations) {
+      String tableId = null;
       try {
         Set<Entry<String,List<Mutation>>> es = 
mutationsToProcess.getMutations().entrySet();
         for (Entry<String,List<Mutation>> entry : es) {
-          TabletLocator locator = getLocator(entry.getKey());
+          tableId = entry.getKey();
+          TabletLocator locator = getLocator(tableId);
           
           String table = entry.getKey();
           List<Mutation> tableMutations = entry.getValue();
@@ -671,7 +664,7 @@ public class TabletServerBatchWriter {
         // assume an IOError communicating with metadata tablet
         failedMutations.add(mutationsToProcess);
       } catch (AccumuloSecurityException e) {
-        updateAuthorizationFailures(Collections.singletonMap(new KeyExtent(new 
Text(MetadataTable.ID), null, null),
+        updateAuthorizationFailures(Collections.singletonMap(new KeyExtent(new 
Text(tableId), null, null),
             SecurityErrorCode.valueOf(e.getSecurityErrorCode().name())));
       } catch (TableDeletedException e) {
         updateUnknownErrors(e.getMessage(), e);
@@ -750,7 +743,7 @@ public class TabletServerBatchWriter {
     
     class SendTask implements Runnable {
       
-      private String location;
+      final private String location;
       
       SendTask(String server) {
         this.location = server;
@@ -847,9 +840,6 @@ public class TabletServerBatchWriter {
       }
       TInfo tinfo = Tracer.traceInfo();
       
-      // TODO remove this
-      TTransport transport = null;
-      
       timeoutTracker.startingWrite();
       
       try {
@@ -942,8 +932,6 @@ public class TabletServerBatchWriter {
         throw new IOException(e);
       } catch (TException e) {
         throw new IOException(e);
-      } finally {
-        ThriftTransportPool.getInstance().returnTransport(transport);
       }
     }
   }
@@ -952,7 +940,7 @@ public class TabletServerBatchWriter {
   
   private static class MutationSet {
     
-    private HashMap<String,List<Mutation>> mutations;
+    private final HashMap<String,List<Mutation>> mutations;
     private int memoryUsed = 0;
     
     MutationSet() {

Reply via email to