Repository: accumulo Updated Branches: refs/heads/master 85c1a496b -> 6bd2a457f
ACCUMULO-3030 allow scanners to be interrupted Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/fbd0fc6b Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/fbd0fc6b Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/fbd0fc6b Branch: refs/heads/master Commit: fbd0fc6baf0c84486c1aebc1d8494647b1f7d068 Parents: e739e57 Author: Eric C. Newton <eric.new...@gmail.com> Authored: Tue Aug 5 07:57:34 2014 -0400 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Tue Aug 5 07:58:24 2014 -0400 ---------------------------------------------------------------------- .../core/client/impl/ThriftScanner.java | 16 ++-- .../apache/accumulo/test/Accumulo3030IT.java | 83 ++++++++++++++++++++ 2 files changed, 93 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/fbd0fc6b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java index b765ef4..c49330e 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java @@ -57,7 +57,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException; import org.apache.accumulo.core.util.OpTimer; import org.apache.accumulo.core.util.ThriftUtil; -import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.trace.instrument.Span; import org.apache.accumulo.trace.instrument.Trace; import org.apache.accumulo.trace.instrument.Tracer; @@ -212,6 +211,9 @@ public class ThriftScanner { Span span = Trace.start("scan"); try { while (results == null && !scanState.finished) { + if (Thread.currentThread().isInterrupted()) { + throw new AccumuloException("Thread interrupted"); + } if ((System.currentTimeMillis() - startTime) / 1000.0 > timeOut) throw new ScanTimedOutException(); @@ -237,7 +239,7 @@ public class ThriftScanner { else if (log.isTraceEnabled()) log.trace(error); lastError = error; - UtilWaitThread.sleep(100); + Thread.sleep(100); } else { // when a tablet splits we do want to continue scanning the low child // of the split if we are already passed it @@ -265,7 +267,7 @@ public class ThriftScanner { log.trace(error); lastError = error; - UtilWaitThread.sleep(100); + Thread.sleep(100); } finally { locateSpan.stop(); } @@ -300,7 +302,7 @@ public class ThriftScanner { if (scanState.isolated) throw new IsolationException(); - UtilWaitThread.sleep(100); + Thread.sleep(100); } catch (NoSuchScanIDException e) { error = "Scan failed, no such scan id " + scanState.scanID + " " + loc; if (!error.equals(lastError)) @@ -335,7 +337,7 @@ public class ThriftScanner { if (scanState.isolated) throw new IsolationException(); - UtilWaitThread.sleep(100); + Thread.sleep(100); } catch (TException e) { TabletLocator.getLocator(instance, scanState.tableId).invalidateCache(loc.tablet_location); error = "Scan failed, thrift error " + e.getClass().getName() + " " + e.getMessage() + " " + loc; @@ -353,7 +355,7 @@ public class ThriftScanner { if (scanState.isolated) throw new IsolationException(); - UtilWaitThread.sleep(100); + Thread.sleep(100); } finally { scanLocation.stop(); } @@ -364,6 +366,8 @@ public class ThriftScanner { } return results; + } catch (InterruptedException ex) { + throw new AccumuloException(ex); } finally { span.stop(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/fbd0fc6b/test/src/test/java/org/apache/accumulo/test/Accumulo3030IT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/Accumulo3030IT.java b/test/src/test/java/org/apache/accumulo/test/Accumulo3030IT.java new file mode 100644 index 0000000..bc56346 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/Accumulo3030IT.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.accumulo.test.functional.SlowIterator; +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Test; + +public class Accumulo3030IT extends ConfigurableMacIT { + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); + } + + @Test(timeout = 60 * 1000) + public void test() throws Exception { + // make a table + final String tableName = getUniqueNames(1)[0]; + final Connector conn = getConnector(); + conn.tableOperations().create(tableName); + // make the world's slowest scanner + final Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY); + final IteratorSetting cfg = new IteratorSetting(100, SlowIterator.class); + SlowIterator.setSeekSleepTime(cfg, 99999*1000); + scanner.addScanIterator(cfg); + // create a thread to interrupt the slow scan + final Thread scanThread = Thread.currentThread(); + Thread thread = new Thread() { + @Override + public void run() { + try { + // ensure the scan is running: not perfect, the metadata tables could be scanned, too. + String tserver = conn.instanceOperations().getTabletServers().iterator().next(); + while (conn.instanceOperations().getActiveScans(tserver).size() < 1) { + UtilWaitThread.sleep(1000); + } + } catch (Exception e) { + e.printStackTrace(); + } + // BAM! + scanThread.interrupt(); + } + }; + thread.start(); + try { + // Use the scanner, expect problems + for (@SuppressWarnings("unused") Entry<Key,Value> entry : scanner) { + } + Assert.fail("Scan should not succeed"); + } catch (Exception ex) { + } finally { + thread.join(); + } + } + +}