Repository: accumulo Updated Branches: refs/heads/1.6.0-SNAPSHOT 1645b4f62 -> 0f3b4ca0a
ACCUMULO-2695 Fixed Conditional writer hang caused by tablet server fault This change fixes issues that occurred when the conditional writer got an exception when trying to connect to a tserver. The conditonal writer could throw an exception or loose mutations in this case. In either case the client would hang. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0f3b4ca0 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0f3b4ca0 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0f3b4ca0 Branch: refs/heads/1.6.0-SNAPSHOT Commit: 0f3b4ca0a321eb7514fe4b5c6dcc9b71c09ff187 Parents: 1645b4f Author: Keith Turner <ktur...@apache.org> Authored: Fri Apr 18 17:39:36 2014 -0400 Committer: Keith Turner <ktur...@apache.org> Committed: Fri Apr 18 17:39:36 2014 -0400 ---------------------------------------------------------------------- .../core/client/impl/ConditionalWriterImpl.java | 21 ++++++++------------ .../test/randomwalk/conditional/Init.java | 7 ++++++- 2 files changed, 14 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f3b4ca0/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java index 1ec6dee..01e4b95 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java @@ -559,15 +559,15 @@ class ConditionalWriterImpl implements ConditionalWriter { SessionID sessionId = null; - try { - - client = getClient(location); - + try { Map<TKeyExtent,List<TConditionalMutation>> tmutations = new HashMap<TKeyExtent,List<TConditionalMutation>>(); CompressedIterators compressedIters = new CompressedIterators(); convertMutations(mutations, cmidToCm, cmid, tmutations, compressedIters); + //getClient() call must come after converMutations in case it throws a TException + client = getClient(location); + List<TCMResult> tresults = null; while (tresults == null) { try { @@ -615,7 +615,8 @@ class ConditionalWriterImpl implements ConditionalWriter { } catch (Exception e) { queueException(location, cmidToCm, e); } finally { - unreserveSessionID(location); + if(sessionId != null) + unreserveSessionID(location); ThriftUtil.returnClient((TServiceClient) client); } } @@ -637,7 +638,7 @@ class ConditionalWriterImpl implements ConditionalWriter { queueRetry(cmidToCm, location); } else { try { - invalidateSession(sessionId, location, mutations); + invalidateSession(sessionId, location); for (CMK cmk : cmidToCm.values()) cmk.cm.queueResult(new Result(Status.UNKNOWN, cmk.cm, location)); } catch (Exception e2) { @@ -652,15 +653,9 @@ class ConditionalWriterImpl implements ConditionalWriter { * * If a conditional mutation is taking a long time to process, then this method will wait for it to finish... unless this exceeds timeout. */ - private void invalidateSession(SessionID sessionId, String location, TabletServerMutations<QCMutation> mutations) throws AccumuloException, + private void invalidateSession(SessionID sessionId, String location) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - ArrayList<QCMutation> mutList = new ArrayList<QCMutation>(); - - for (List<QCMutation> tml : mutations.getMutations().values()) { - mutList.addAll(tml); - } - long sleepTime = 50; long startTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f3b4ca0/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Init.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Init.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Init.java index bfad730..28f1fd8 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Init.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Init.java @@ -66,7 +66,12 @@ public class Init extends Test { m.put(cf, "seq", Utils.getSeq(0)); if (j % 1000 == 0 && j > 0) { - if (cw.write(m).getStatus() == Status.ACCEPTED) + Status status = cw.write(m).getStatus(); + + while(status == Status.UNKNOWN) + status = cw.write(m).getStatus(); + + if (status == Status.ACCEPTED) acceptedCount++; m = new ConditionalMutation(Utils.getBank(i)); }