Updated Branches: refs/heads/master 70031c4dc -> 05b3359b9
ACCUMULO-1566 Pass down the readaheadThreshold parameter from the client to the server so that the same limit is adhered to by the server in regards to pipelining. Thanks to Keith for his help here. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/05b3359b Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/05b3359b Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/05b3359b Branch: refs/heads/master Commit: 05b3359b9c6643cbdb2284afead9a0dcac2a9300 Parents: 70031c4 Author: Josh Elser <[email protected]> Authored: Tue Oct 8 18:54:30 2013 -0400 Committer: Josh Elser <[email protected]> Committed: Tue Oct 8 23:00:47 2013 -0400 ---------------------------------------------------------------------- .../core/client/impl/ScannerIterator.java | 2 +- .../core/client/impl/ThriftScanner.java | 15 ++- .../thrift/TabletClientService.java | 124 +++++++++++++++++-- core/src/main/thrift/tabletserver.thrift | 3 +- .../server/tabletserver/TabletServer.java | 10 +- .../test/performance/thrift/NullTserver.java | 2 +- 6 files changed, 133 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/05b3359b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java index e9d1412..b202d3a 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java @@ -145,7 +145,7 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>> { } scanState = new ScanState(instance, credentials, tableId, authorizations, new Range(range), options.fetchedColumns, size, options.serverSideIteratorList, - options.serverSideIteratorOptions, isolated); + options.serverSideIteratorOptions, isolated, readaheadThreshold); // If we want to start readahead immediately, don't wait for hasNext to be called if (0l == readaheadThreshold) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/05b3359b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java index efdd142..efb31e8 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Instance; @@ -97,7 +98,7 @@ public class ThriftScanner { boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(server); InitialScan isr = client.startScan(tinfo, scanState.credentials.toThrift(instance), extent.toThrift(), scanState.range.toThrift(), Translator.translate(scanState.columns, Translator.CT), scanState.size, scanState.serverSideIteratorList, scanState.serverSideIteratorOptions, - scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated); + scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated, scanState.readaheadThreshold); if (waitForWrites) serversWaitedForWrites.get(ttype).add(server); @@ -132,6 +133,7 @@ public class ThriftScanner { Text tableId; Text startRow; boolean skipStartRow; + long readaheadThreshold; Range range; @@ -150,9 +152,15 @@ public class ThriftScanner { List<IterInfo> serverSideIteratorList; Map<String,Map<String,String>> serverSideIteratorOptions; - + public ScanState(Instance instance, Credentials credentials, Text tableId, Authorizations authorizations, Range range, SortedSet<Column> fetchedColumns, int size, List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>> serverSideIteratorOptions, boolean isolated) { + this(instance, credentials, tableId, authorizations, range, fetchedColumns, size, serverSideIteratorList, serverSideIteratorOptions, isolated, + Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD); + } + + public ScanState(Instance instance, Credentials credentials, Text tableId, Authorizations authorizations, Range range, SortedSet<Column> fetchedColumns, + int size, List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>> serverSideIteratorOptions, boolean isolated, long readaheadThreshold) { this.instance = instance; this.credentials = credentials; this.authorizations = authorizations; @@ -179,6 +187,7 @@ public class ThriftScanner { this.serverSideIteratorOptions = serverSideIteratorOptions; this.isolated = isolated; + this.readaheadThreshold = readaheadThreshold; } } @@ -389,7 +398,7 @@ public class ThriftScanner { boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(loc.tablet_location); InitialScan is = client.startScan(tinfo, scanState.credentials.toThrift(scanState.instance), loc.tablet_extent.toThrift(), scanState.range.toThrift(), Translator.translate(scanState.columns, Translator.CT), scanState.size, scanState.serverSideIteratorList, scanState.serverSideIteratorOptions, - scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated); + scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated, scanState.readaheadThreshold); if (waitForWrites) serversWaitedForWrites.get(ttype).add(loc.tablet_location); http://git-wip-us.apache.org/repos/asf/accumulo/blob/05b3359b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java index bd6578d..d02b5da 100644 --- a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java +++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java @@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory; public interface Iface extends org.apache.accumulo.core.client.impl.thrift.ClientService.Iface { - public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException; + public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException; public org.apache.accumulo.core.data.thrift.ScanResult continueScan(org.apache.accumulo.trace.thrift.TInfo tinfo, long scanID) throws NoSuchScanIDException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException; @@ -114,7 +114,7 @@ import org.slf4j.LoggerFactory; public interface AsyncIface extends org.apache.accumulo.core.client.impl.thrift.ClientService .AsyncIface { - public void startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.startScan_call> resultHandler) throws org.apache.thrift.TException; + public void startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.startScan_call> resultHandler) throws org.apache.thrift.TException; public void continueScan(org.apache.accumulo.trace.thrift.TInfo tinfo, long scanID, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.continueScan_call> resultHandler) throws org.apache.thrift.TException; @@ -196,13 +196,13 @@ import org.slf4j.LoggerFactory; super(iprot, oprot); } - public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException + public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException { - send_startScan(tinfo, credentials, extent, range, columns, batchSize, ssiList, ssio, authorizations, waitForWrites, isolated); + send_startScan(tinfo, credentials, extent, range, columns, batchSize, ssiList, ssio, authorizations, waitForWrites, isolated, readaheadThreshold); return recv_startScan(); } - public void send_startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated) throws org.apache.thrift.TException + public void send_startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold) throws org.apache.thrift.TException { startScan_args args = new startScan_args(); args.setTinfo(tinfo); @@ -216,6 +216,7 @@ import org.slf4j.LoggerFactory; args.setAuthorizations(authorizations); args.setWaitForWrites(waitForWrites); args.setIsolated(isolated); + args.setReadaheadThreshold(readaheadThreshold); sendBase("startScan", args); } @@ -922,9 +923,9 @@ import org.slf4j.LoggerFactory; super(protocolFactory, clientManager, transport); } - public void startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, org.apache.thrift.async.AsyncMethodCallback<startScan_call> resultHandler) throws org.apache.thrift.TException { + public void startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, org.apache.thrift.async.AsyncMethodCallback<startScan_call> resultHandler) throws org.apache.thrift.TException { checkReady(); - startScan_call method_call = new startScan_call(tinfo, credentials, extent, range, columns, batchSize, ssiList, ssio, authorizations, waitForWrites, isolated, resultHandler, this, ___protocolFactory, ___transport); + startScan_call method_call = new startScan_call(tinfo, credentials, extent, range, columns, batchSize, ssiList, ssio, authorizations, waitForWrites, isolated, readaheadThreshold, resultHandler, this, ___protocolFactory, ___transport); this.___currentMethod = method_call; ___manager.call(method_call); } @@ -941,7 +942,8 @@ import org.slf4j.LoggerFactory; private List<ByteBuffer> authorizations; private boolean waitForWrites; private boolean isolated; - public startScan_call(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, org.apache.thrift.async.AsyncMethodCallback<startScan_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + private long readaheadThreshold; + public startScan_call(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, org.apache.thrift.async.AsyncMethodCallback<startScan_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { super(client, protocolFactory, transport, resultHandler, false); this.tinfo = tinfo; this.credentials = credentials; @@ -954,6 +956,7 @@ import org.slf4j.LoggerFactory; this.authorizations = authorizations; this.waitForWrites = waitForWrites; this.isolated = isolated; + this.readaheadThreshold = readaheadThreshold; } public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { @@ -970,6 +973,7 @@ import org.slf4j.LoggerFactory; args.setAuthorizations(authorizations); args.setWaitForWrites(waitForWrites); args.setIsolated(isolated); + args.setReadaheadThreshold(readaheadThreshold); args.write(prot); prot.writeMessageEnd(); } @@ -2170,7 +2174,7 @@ import org.slf4j.LoggerFactory; public startScan_result getResult(I iface, startScan_args args) throws org.apache.thrift.TException { startScan_result result = new startScan_result(); try { - result.success = iface.startScan(args.tinfo, args.credentials, args.extent, args.range, args.columns, args.batchSize, args.ssiList, args.ssio, args.authorizations, args.waitForWrites, args.isolated); + result.success = iface.startScan(args.tinfo, args.credentials, args.extent, args.range, args.columns, args.batchSize, args.ssiList, args.ssio, args.authorizations, args.waitForWrites, args.isolated, args.readaheadThreshold); } catch (org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException sec) { result.sec = sec; } catch (NotServingTabletException nste) { @@ -2846,6 +2850,7 @@ import org.slf4j.LoggerFactory; private static final org.apache.thrift.protocol.TField AUTHORIZATIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("authorizations", org.apache.thrift.protocol.TType.LIST, (short)8); private static final org.apache.thrift.protocol.TField WAIT_FOR_WRITES_FIELD_DESC = new org.apache.thrift.protocol.TField("waitForWrites", org.apache.thrift.protocol.TType.BOOL, (short)9); private static final org.apache.thrift.protocol.TField ISOLATED_FIELD_DESC = new org.apache.thrift.protocol.TField("isolated", org.apache.thrift.protocol.TType.BOOL, (short)10); + private static final org.apache.thrift.protocol.TField READAHEAD_THRESHOLD_FIELD_DESC = new org.apache.thrift.protocol.TField("readaheadThreshold", org.apache.thrift.protocol.TType.I64, (short)12); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -2864,6 +2869,7 @@ import org.slf4j.LoggerFactory; public List<ByteBuffer> authorizations; // required public boolean waitForWrites; // required public boolean isolated; // required + public long readaheadThreshold; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -2877,7 +2883,8 @@ import org.slf4j.LoggerFactory; SSIO((short)7, "ssio"), AUTHORIZATIONS((short)8, "authorizations"), WAIT_FOR_WRITES((short)9, "waitForWrites"), - ISOLATED((short)10, "isolated"); + ISOLATED((short)10, "isolated"), + READAHEAD_THRESHOLD((short)12, "readaheadThreshold"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -2914,6 +2921,8 @@ import org.slf4j.LoggerFactory; return WAIT_FOR_WRITES; case 10: // ISOLATED return ISOLATED; + case 12: // READAHEAD_THRESHOLD + return READAHEAD_THRESHOLD; default: return null; } @@ -2957,6 +2966,7 @@ import org.slf4j.LoggerFactory; private static final int __BATCHSIZE_ISSET_ID = 0; private static final int __WAITFORWRITES_ISSET_ID = 1; private static final int __ISOLATED_ISSET_ID = 2; + private static final int __READAHEADTHRESHOLD_ISSET_ID = 3; private byte __isset_bitfield = 0; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { @@ -2990,6 +3000,8 @@ import org.slf4j.LoggerFactory; new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); tmpMap.put(_Fields.ISOLATED, new org.apache.thrift.meta_data.FieldMetaData("isolated", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); + tmpMap.put(_Fields.READAHEAD_THRESHOLD, new org.apache.thrift.meta_data.FieldMetaData("readaheadThreshold", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(startScan_args.class, metaDataMap); } @@ -3008,7 +3020,8 @@ import org.slf4j.LoggerFactory; Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, - boolean isolated) + boolean isolated, + long readaheadThreshold) { this(); this.tinfo = tinfo; @@ -3025,6 +3038,8 @@ import org.slf4j.LoggerFactory; setWaitForWritesIsSet(true); this.isolated = isolated; setIsolatedIsSet(true); + this.readaheadThreshold = readaheadThreshold; + setReadaheadThresholdIsSet(true); } /** @@ -3096,6 +3111,7 @@ import org.slf4j.LoggerFactory; } this.waitForWrites = other.waitForWrites; this.isolated = other.isolated; + this.readaheadThreshold = other.readaheadThreshold; } public startScan_args deepCopy() { @@ -3118,6 +3134,8 @@ import org.slf4j.LoggerFactory; this.waitForWrites = false; setIsolatedIsSet(false); this.isolated = false; + setReadaheadThresholdIsSet(false); + this.readaheadThreshold = 0; } public org.apache.accumulo.trace.thrift.TInfo getTinfo() { @@ -3437,6 +3455,29 @@ import org.slf4j.LoggerFactory; __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __ISOLATED_ISSET_ID, value); } + public long getReadaheadThreshold() { + return this.readaheadThreshold; + } + + public startScan_args setReadaheadThreshold(long readaheadThreshold) { + this.readaheadThreshold = readaheadThreshold; + setReadaheadThresholdIsSet(true); + return this; + } + + public void unsetReadaheadThreshold() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __READAHEADTHRESHOLD_ISSET_ID); + } + + /** Returns true if field readaheadThreshold is set (has been assigned a value) and false otherwise */ + public boolean isSetReadaheadThreshold() { + return EncodingUtils.testBit(__isset_bitfield, __READAHEADTHRESHOLD_ISSET_ID); + } + + public void setReadaheadThresholdIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __READAHEADTHRESHOLD_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case TINFO: @@ -3527,6 +3568,14 @@ import org.slf4j.LoggerFactory; } break; + case READAHEAD_THRESHOLD: + if (value == null) { + unsetReadaheadThreshold(); + } else { + setReadaheadThreshold((Long)value); + } + break; + } } @@ -3565,6 +3614,9 @@ import org.slf4j.LoggerFactory; case ISOLATED: return Boolean.valueOf(isIsolated()); + case READAHEAD_THRESHOLD: + return Long.valueOf(getReadaheadThreshold()); + } throw new IllegalStateException(); } @@ -3598,6 +3650,8 @@ import org.slf4j.LoggerFactory; return isSetWaitForWrites(); case ISOLATED: return isSetIsolated(); + case READAHEAD_THRESHOLD: + return isSetReadaheadThreshold(); } throw new IllegalStateException(); } @@ -3714,6 +3768,15 @@ import org.slf4j.LoggerFactory; return false; } + boolean this_present_readaheadThreshold = true; + boolean that_present_readaheadThreshold = true; + if (this_present_readaheadThreshold || that_present_readaheadThreshold) { + if (!(this_present_readaheadThreshold && that_present_readaheadThreshold)) + return false; + if (this.readaheadThreshold != that.readaheadThreshold) + return false; + } + return true; } @@ -3840,6 +3903,16 @@ import org.slf4j.LoggerFactory; return lastComparison; } } + lastComparison = Boolean.valueOf(isSetReadaheadThreshold()).compareTo(typedOther.isSetReadaheadThreshold()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetReadaheadThreshold()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.readaheadThreshold, typedOther.readaheadThreshold); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -3935,6 +4008,10 @@ import org.slf4j.LoggerFactory; sb.append("isolated:"); sb.append(this.isolated); first = false; + if (!first) sb.append(", "); + sb.append("readaheadThreshold:"); + sb.append(this.readaheadThreshold); + first = false; sb.append(")"); return sb.toString(); } @@ -4140,6 +4217,14 @@ import org.slf4j.LoggerFactory; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 12: // READAHEAD_THRESHOLD + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.readaheadThreshold = iprot.readI64(); + struct.setReadaheadThresholdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -4241,6 +4326,9 @@ import org.slf4j.LoggerFactory; struct.tinfo.write(oprot); oprot.writeFieldEnd(); } + oprot.writeFieldBegin(READAHEAD_THRESHOLD_FIELD_DESC); + oprot.writeI64(struct.readaheadThreshold); + oprot.writeFieldEnd(); oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -4292,7 +4380,10 @@ import org.slf4j.LoggerFactory; if (struct.isSetIsolated()) { optionals.set(10); } - oprot.writeBitSet(optionals, 11); + if (struct.isSetReadaheadThreshold()) { + optionals.set(11); + } + oprot.writeBitSet(optionals, 12); if (struct.isSetTinfo()) { struct.tinfo.write(oprot); } @@ -4358,12 +4449,15 @@ import org.slf4j.LoggerFactory; if (struct.isSetIsolated()) { oprot.writeBool(struct.isolated); } + if (struct.isSetReadaheadThreshold()) { + oprot.writeI64(struct.readaheadThreshold); + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, startScan_args struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; - BitSet incoming = iprot.readBitSet(11); + BitSet incoming = iprot.readBitSet(12); if (incoming.get(0)) { struct.tinfo = new org.apache.accumulo.trace.thrift.TInfo(); struct.tinfo.read(iprot); @@ -4463,6 +4557,10 @@ import org.slf4j.LoggerFactory; struct.isolated = iprot.readBool(); struct.setIsolatedIsSet(true); } + if (incoming.get(11)) { + struct.readaheadThreshold = iprot.readI64(); + struct.setReadaheadThresholdIsSet(true); + } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/05b3359b/core/src/main/thrift/tabletserver.thrift ---------------------------------------------------------------------- diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift index 4f9f13a..25e0b10 100644 --- a/core/src/main/thrift/tabletserver.thrift +++ b/core/src/main/thrift/tabletserver.thrift @@ -139,7 +139,8 @@ service TabletClientService extends client.ClientService { 7:map<string, map<string, string>> ssio, 8:list<binary> authorizations 9:bool waitForWrites, - 10:bool isolated) throws (1:client.ThriftSecurityException sec, 2:NotServingTabletException nste, 3:TooManyFilesException tmfe), + 10:bool isolated, + 12:i64 readaheadThreshold) throws (1:client.ThriftSecurityException sec, 2:NotServingTabletException nste, 3:TooManyFilesException tmfe), data.ScanResult continueScan(2:trace.TInfo tinfo, 1:data.ScanID scanID) throws (1:NoSuchScanIDException nssi, 2:NotServingTabletException nste, 3:TooManyFilesException tmfe), oneway void closeScan(2:trace.TInfo tinfo, 1:data.ScanID scanID), http://git-wip-us.apache.org/repos/asf/accumulo/blob/05b3359b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java index abb8750..56f03af 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java @@ -795,6 +795,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu public volatile ScanTask<ScanBatch> nextBatchTask; public AtomicBoolean interruptFlag; public Scanner scanner; + public long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD; @Override public void cleanup() { @@ -1156,9 +1157,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu @Override public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List<TColumn> columns, int batchSize, - List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated) - throws NotServingTabletException, ThriftSecurityException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException { - + List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, + long readaheadThreshold) throws NotServingTabletException, ThriftSecurityException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException { + Authorizations userauths = null; if (!security.canScan(credentials, new String(textent.getTable()), range, columns, ssiList, ssio, authorizations)) throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); @@ -1195,6 +1196,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu scanSession.ssio = ssio; scanSession.auths = new Authorizations(authorizations); scanSession.interruptFlag = new AtomicBoolean(); + scanSession.readaheadThreshold = readaheadThreshold; for (TColumn tcolumn : columns) { scanSession.columnSet.add(new Column(tcolumn)); @@ -1277,7 +1279,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu scanSession.batchCount++; - if (scanResult.more && scanSession.batchCount > 3) { + if (scanResult.more && scanSession.batchCount > scanSession.readaheadThreshold) { // start reading next batch while current batch is transmitted // to client scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag); http://git-wip-us.apache.org/repos/asf/accumulo/blob/05b3359b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java index 9bb7604..f4eb234 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java @@ -138,7 +138,7 @@ public class NullTserver { @Override public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent extent, TRange range, List<TColumn> columns, int batchSize, - List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated) { + List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold) { return null; }
