Merge branch '1.6' into 1.7 Conflicts: server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e76d89a8 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e76d89a8 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e76d89a8 Branch: refs/heads/master Commit: e76d89a8b5070840cacabbe5b29c7ec569be66ec Parents: 5ef5b85 25475d0 Author: Christopher Tubbs <ctubb...@apache.org> Authored: Tue Jun 16 17:37:08 2015 -0400 Committer: Christopher Tubbs <ctubb...@apache.org> Committed: Tue Jun 16 17:37:08 2015 -0400 ---------------------------------------------------------------------- .../apache/accumulo/tserver/TabletServer.java | 47 +++++++++++++++----- 1 file changed, 36 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/e76d89a8/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 5656db4,3cffa38..8b6ad30 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -764,13 -1586,18 +765,21 @@@ public class TabletServer extends Accum for (TMutation tmutation : tmutations) { Mutation mutation = new ServerMutation(tmutation); mutations.add(mutation); - us.queuedMutationSize += mutation.numBytes(); + additionalMutationSize += mutation.numBytes(); } - if (us.queuedMutationSize > getSystemConfiguration().getMemoryInBytes(Property.TSERV_MUTATION_QUEUE_MAX)) { + us.queuedMutationSize += additionalMutationSize; + long totalQueued = updateTotalQueuedMutationSize(additionalMutationSize); + long total = TabletServer.this.getConfiguration().getMemoryInBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX); + if (totalQueued > total) { - flush(us); + try { + flush(us); + } catch (HoldTimeoutException hte) { + // Assumption is that the client has timed out and is gone. If thats not the case, then removing the session should cause the client to fail + // in such a way that it retries. + log.debug("HoldTimeoutException during applyUpdates, removing session"); + sessionManager.removeSession(updateID, true); + reserved = false; + } } } } finally { @@@ -987,18 -1818,24 +998,25 @@@ if (tablet == null) { throw new NotServingTabletException(tkeyExtent); } + Durability tabletDurability = tablet.getDurability(); - if (!keyExtent.isMeta()) - TabletServer.this.resourceManager.waitUntilCommitsAreEnabled(); + if (!keyExtent.isMeta()) { + try { + TabletServer.this.resourceManager.waitUntilCommitsAreEnabled(); + } catch (HoldTimeoutException hte) { + // Major hack. Assumption is that the client has timed out and is gone. If thats not the case, then throwing the following will let client know there + // was a failure and it should retry. + throw new NotServingTabletException(tkeyExtent); + } + } - long opid = writeTracker.startWrite(TabletType.type(keyExtent)); + final long opid = writeTracker.startWrite(TabletType.type(keyExtent)); try { - Mutation mutation = new ServerMutation(tmutation); - List<Mutation> mutations = Collections.singletonList(mutation); + final Mutation mutation = new ServerMutation(tmutation); + final List<Mutation> mutations = Collections.singletonList(mutation); - Span prep = Trace.start("prep"); + final Span prep = Trace.start("prep"); CommitSession cs; try { cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, credentials), mutations);