Updated Branches: refs/heads/ACCUMULO-1566 [created] ff95c7147
ACCUMULO-1566 Pass down the readaheadThreshold parameter from the client to the server so that the same limit is adhered to by the server in regards to pipelining. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ff95c714 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ff95c714 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ff95c714 Branch: refs/heads/ACCUMULO-1566 Commit: ff95c7147d210171fef3824eae399b7384cdaff9 Parents: e70a40d Author: Josh Elser <els...@apache.org> Authored: Tue Oct 8 18:54:30 2013 -0400 Committer: Josh Elser <els...@apache.org> Committed: Tue Oct 8 18:54:30 2013 -0400 ---------------------------------------------------------------------- .../core/client/impl/ThriftScanner.java | 15 ++- .../thrift/TabletClientService.java | 124 +++++++++++++++++-- core/src/main/thrift/tabletserver.thrift | 3 +- .../server/tabletserver/TabletServer.java | 10 +- .../test/performance/thrift/NullTserver.java | 2 +- 5 files changed, 132 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff95c714/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java index efdd142..efb31e8 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Instance; @@ -97,7 +98,7 @@ public class ThriftScanner { boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(server); InitialScan isr = client.startScan(tinfo, scanState.credentials.toThrift(instance), extent.toThrift(), scanState.range.toThrift(), Translator.translate(scanState.columns, Translator.CT), scanState.size, scanState.serverSideIteratorList, scanState.serverSideIteratorOptions, - scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated); + scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated, scanState.readaheadThreshold); if (waitForWrites) serversWaitedForWrites.get(ttype).add(server); @@ -132,6 +133,7 @@ public class ThriftScanner { Text tableId; Text startRow; boolean skipStartRow; + long readaheadThreshold; Range range; @@ -150,9 +152,15 @@ public class ThriftScanner { List<IterInfo> serverSideIteratorList; Map<String,Map<String,String>> serverSideIteratorOptions; - + public ScanState(Instance instance, Credentials credentials, Text tableId, Authorizations authorizations, Range range, SortedSet<Column> fetchedColumns, int size, List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>> serverSideIteratorOptions, boolean isolated) { + this(instance, credentials, tableId, authorizations, range, fetchedColumns, size, serverSideIteratorList, serverSideIteratorOptions, isolated, + Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD); + } + + public ScanState(Instance instance, Credentials credentials, Text tableId, Authorizations authorizations, Range range, SortedSet<Column> fetchedColumns, + int size, List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>> serverSideIteratorOptions, boolean isolated, long readaheadThreshold) { this.instance = instance; this.credentials = credentials; this.authorizations = authorizations; @@ -179,6 +187,7 @@ public class ThriftScanner { this.serverSideIteratorOptions = serverSideIteratorOptions; this.isolated = isolated; + this.readaheadThreshold = readaheadThreshold; } } @@ -389,7 +398,7 @@ public class ThriftScanner { boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(loc.tablet_location); InitialScan is = client.startScan(tinfo, scanState.credentials.toThrift(scanState.instance), loc.tablet_extent.toThrift(), scanState.range.toThrift(), Translator.translate(scanState.columns, Translator.CT), scanState.size, scanState.serverSideIteratorList, scanState.serverSideIteratorOptions, - scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated); + scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated, scanState.readaheadThreshold); if (waitForWrites) serversWaitedForWrites.get(ttype).add(loc.tablet_location); http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff95c714/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java index bd6578d..d02b5da 100644 --- a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java +++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java @@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory; public interface Iface extends org.apache.accumulo.core.client.impl.thrift.ClientService.Iface { - public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException; + public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException; public org.apache.accumulo.core.data.thrift.ScanResult continueScan(org.apache.accumulo.trace.thrift.TInfo tinfo, long scanID) throws NoSuchScanIDException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException; @@ -114,7 +114,7 @@ import org.slf4j.LoggerFactory; public interface AsyncIface extends org.apache.accumulo.core.client.impl.thrift.ClientService .AsyncIface { - public void startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.startScan_call> resultHandler) throws org.apache.thrift.TException; + public void startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.startScan_call> resultHandler) throws org.apache.thrift.TException; public void continueScan(org.apache.accumulo.trace.thrift.TInfo tinfo, long scanID, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.continueScan_call> resultHandler) throws org.apache.thrift.TException; @@ -196,13 +196,13 @@ import org.slf4j.LoggerFactory; super(iprot, oprot); } - public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException + public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException { - send_startScan(tinfo, credentials, extent, range, columns, batchSize, ssiList, ssio, authorizations, waitForWrites, isolated); + send_startScan(tinfo, credentials, extent, range, columns, batchSize, ssiList, ssio, authorizations, waitForWrites, isolated, readaheadThreshold); return recv_startScan(); } - public void send_startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated) throws org.apache.thrift.TException + public void send_startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold) throws org.apache.thrift.TException { startScan_args args = new startScan_args(); args.setTinfo(tinfo); @@ -216,6 +216,7 @@ import org.slf4j.LoggerFactory; args.setAuthorizations(authorizations); args.setWaitForWrites(waitForWrites); args.setIsolated(isolated); + args.setReadaheadThreshold(readaheadThreshold); sendBase("startScan", args); } @@ -922,9 +923,9 @@ import org.slf4j.LoggerFactory; super(protocolFactory, clientManager, transport); } - public void startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, org.apache.thrift.async.AsyncMethodCallback<startScan_call> resultHandler) throws org.apache.thrift.TException { + public void startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, org.apache.thrift.async.AsyncMethodCallback<startScan_call> resultHandler) throws org.apache.thrift.TException { checkReady(); - startScan_call method_call = new startScan_call(tinfo, credentials, extent, range, columns, batchSize, ssiList, ssio, authorizations, waitForWrites, isolated, resultHandler, this, ___protocolFactory, ___transport); + startScan_call method_call = new startScan_call(tinfo, credentials, extent, range, columns, batchSize, ssiList, ssio, authorizations, waitForWrites, isolated, readaheadThreshold, resultHandler, this, ___protocolFactory, ___transport); this.___currentMethod = method_call; ___manager.call(method_call); } @@ -941,7 +942,8 @@ import org.slf4j.LoggerFactory; private List<ByteBuffer> authorizations; private boolean waitForWrites; private boolean isolated; - public startScan_call(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, org.apache.thrift.async.AsyncMethodCallback<startScan_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + private long readaheadThreshold; + public startScan_call(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, org.apache.thrift.async.AsyncMethodCallback<startScan_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { super(client, protocolFactory, transport, resultHandler, false); this.tinfo = tinfo; this.credentials = credentials; @@ -954,6 +956,7 @@ import org.slf4j.LoggerFactory; this.authorizations = authorizations; this.waitForWrites = waitForWrites; this.isolated = isolated; + this.readaheadThreshold = readaheadThreshold; } public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { @@ -970,6 +973,7 @@ import org.slf4j.LoggerFactory; args.setAuthorizations(authorizations); args.setWaitForWrites(waitForWrites); args.setIsolated(isolated); + args.setReadaheadThreshold(readaheadThreshold); args.write(prot); prot.writeMessageEnd(); } @@ -2170,7 +2174,7 @@ import org.slf4j.LoggerFactory; public startScan_result getResult(I iface, startScan_args args) throws org.apache.thrift.TException { startScan_result result = new startScan_result(); try { - result.success = iface.startScan(args.tinfo, args.credentials, args.extent, args.range, args.columns, args.batchSize, args.ssiList, args.ssio, args.authorizations, args.waitForWrites, args.isolated); + result.success = iface.startScan(args.tinfo, args.credentials, args.extent, args.range, args.columns, args.batchSize, args.ssiList, args.ssio, args.authorizations, args.waitForWrites, args.isolated, args.readaheadThreshold); } catch (org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException sec) { result.sec = sec; } catch (NotServingTabletException nste) { @@ -2846,6 +2850,7 @@ import org.slf4j.LoggerFactory; private static final org.apache.thrift.protocol.TField AUTHORIZATIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("authorizations", org.apache.thrift.protocol.TType.LIST, (short)8); private static final org.apache.thrift.protocol.TField WAIT_FOR_WRITES_FIELD_DESC = new org.apache.thrift.protocol.TField("waitForWrites", org.apache.thrift.protocol.TType.BOOL, (short)9); private static final org.apache.thrift.protocol.TField ISOLATED_FIELD_DESC = new org.apache.thrift.protocol.TField("isolated", org.apache.thrift.protocol.TType.BOOL, (short)10); + private static final org.apache.thrift.protocol.TField READAHEAD_THRESHOLD_FIELD_DESC = new org.apache.thrift.protocol.TField("readaheadThreshold", org.apache.thrift.protocol.TType.I64, (short)12); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -2864,6 +2869,7 @@ import org.slf4j.LoggerFactory; public List<ByteBuffer> authorizations; // required public boolean waitForWrites; // required public boolean isolated; // required + public long readaheadThreshold; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -2877,7 +2883,8 @@ import org.slf4j.LoggerFactory; SSIO((short)7, "ssio"), AUTHORIZATIONS((short)8, "authorizations"), WAIT_FOR_WRITES((short)9, "waitForWrites"), - ISOLATED((short)10, "isolated"); + ISOLATED((short)10, "isolated"), + READAHEAD_THRESHOLD((short)12, "readaheadThreshold"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -2914,6 +2921,8 @@ import org.slf4j.LoggerFactory; return WAIT_FOR_WRITES; case 10: // ISOLATED return ISOLATED; + case 12: // READAHEAD_THRESHOLD + return READAHEAD_THRESHOLD; default: return null; } @@ -2957,6 +2966,7 @@ import org.slf4j.LoggerFactory; private static final int __BATCHSIZE_ISSET_ID = 0; private static final int __WAITFORWRITES_ISSET_ID = 1; private static final int __ISOLATED_ISSET_ID = 2; + private static final int __READAHEADTHRESHOLD_ISSET_ID = 3; private byte __isset_bitfield = 0; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { @@ -2990,6 +3000,8 @@ import org.slf4j.LoggerFactory; new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); tmpMap.put(_Fields.ISOLATED, new org.apache.thrift.meta_data.FieldMetaData("isolated", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); + tmpMap.put(_Fields.READAHEAD_THRESHOLD, new org.apache.thrift.meta_data.FieldMetaData("readaheadThreshold", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(startScan_args.class, metaDataMap); } @@ -3008,7 +3020,8 @@ import org.slf4j.LoggerFactory; Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, - boolean isolated) + boolean isolated, + long readaheadThreshold) { this(); this.tinfo = tinfo; @@ -3025,6 +3038,8 @@ import org.slf4j.LoggerFactory; setWaitForWritesIsSet(true); this.isolated = isolated; setIsolatedIsSet(true); + this.readaheadThreshold = readaheadThreshold; + setReadaheadThresholdIsSet(true); } /** @@ -3096,6 +3111,7 @@ import org.slf4j.LoggerFactory; } this.waitForWrites = other.waitForWrites; this.isolated = other.isolated; + this.readaheadThreshold = other.readaheadThreshold; } public startScan_args deepCopy() { @@ -3118,6 +3134,8 @@ import org.slf4j.LoggerFactory; this.waitForWrites = false; setIsolatedIsSet(false); this.isolated = false; + setReadaheadThresholdIsSet(false); + this.readaheadThreshold = 0; } public org.apache.accumulo.trace.thrift.TInfo getTinfo() { @@ -3437,6 +3455,29 @@ import org.slf4j.LoggerFactory; __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __ISOLATED_ISSET_ID, value); } + public long getReadaheadThreshold() { + return this.readaheadThreshold; + } + + public startScan_args setReadaheadThreshold(long readaheadThreshold) { + this.readaheadThreshold = readaheadThreshold; + setReadaheadThresholdIsSet(true); + return this; + } + + public void unsetReadaheadThreshold() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __READAHEADTHRESHOLD_ISSET_ID); + } + + /** Returns true if field readaheadThreshold is set (has been assigned a value) and false otherwise */ + public boolean isSetReadaheadThreshold() { + return EncodingUtils.testBit(__isset_bitfield, __READAHEADTHRESHOLD_ISSET_ID); + } + + public void setReadaheadThresholdIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __READAHEADTHRESHOLD_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case TINFO: @@ -3527,6 +3568,14 @@ import org.slf4j.LoggerFactory; } break; + case READAHEAD_THRESHOLD: + if (value == null) { + unsetReadaheadThreshold(); + } else { + setReadaheadThreshold((Long)value); + } + break; + } } @@ -3565,6 +3614,9 @@ import org.slf4j.LoggerFactory; case ISOLATED: return Boolean.valueOf(isIsolated()); + case READAHEAD_THRESHOLD: + return Long.valueOf(getReadaheadThreshold()); + } throw new IllegalStateException(); } @@ -3598,6 +3650,8 @@ import org.slf4j.LoggerFactory; return isSetWaitForWrites(); case ISOLATED: return isSetIsolated(); + case READAHEAD_THRESHOLD: + return isSetReadaheadThreshold(); } throw new IllegalStateException(); } @@ -3714,6 +3768,15 @@ import org.slf4j.LoggerFactory; return false; } + boolean this_present_readaheadThreshold = true; + boolean that_present_readaheadThreshold = true; + if (this_present_readaheadThreshold || that_present_readaheadThreshold) { + if (!(this_present_readaheadThreshold && that_present_readaheadThreshold)) + return false; + if (this.readaheadThreshold != that.readaheadThreshold) + return false; + } + return true; } @@ -3840,6 +3903,16 @@ import org.slf4j.LoggerFactory; return lastComparison; } } + lastComparison = Boolean.valueOf(isSetReadaheadThreshold()).compareTo(typedOther.isSetReadaheadThreshold()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetReadaheadThreshold()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.readaheadThreshold, typedOther.readaheadThreshold); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -3935,6 +4008,10 @@ import org.slf4j.LoggerFactory; sb.append("isolated:"); sb.append(this.isolated); first = false; + if (!first) sb.append(", "); + sb.append("readaheadThreshold:"); + sb.append(this.readaheadThreshold); + first = false; sb.append(")"); return sb.toString(); } @@ -4140,6 +4217,14 @@ import org.slf4j.LoggerFactory; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 12: // READAHEAD_THRESHOLD + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.readaheadThreshold = iprot.readI64(); + struct.setReadaheadThresholdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -4241,6 +4326,9 @@ import org.slf4j.LoggerFactory; struct.tinfo.write(oprot); oprot.writeFieldEnd(); } + oprot.writeFieldBegin(READAHEAD_THRESHOLD_FIELD_DESC); + oprot.writeI64(struct.readaheadThreshold); + oprot.writeFieldEnd(); oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -4292,7 +4380,10 @@ import org.slf4j.LoggerFactory; if (struct.isSetIsolated()) { optionals.set(10); } - oprot.writeBitSet(optionals, 11); + if (struct.isSetReadaheadThreshold()) { + optionals.set(11); + } + oprot.writeBitSet(optionals, 12); if (struct.isSetTinfo()) { struct.tinfo.write(oprot); } @@ -4358,12 +4449,15 @@ import org.slf4j.LoggerFactory; if (struct.isSetIsolated()) { oprot.writeBool(struct.isolated); } + if (struct.isSetReadaheadThreshold()) { + oprot.writeI64(struct.readaheadThreshold); + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, startScan_args struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; - BitSet incoming = iprot.readBitSet(11); + BitSet incoming = iprot.readBitSet(12); if (incoming.get(0)) { struct.tinfo = new org.apache.accumulo.trace.thrift.TInfo(); struct.tinfo.read(iprot); @@ -4463,6 +4557,10 @@ import org.slf4j.LoggerFactory; struct.isolated = iprot.readBool(); struct.setIsolatedIsSet(true); } + if (incoming.get(11)) { + struct.readaheadThreshold = iprot.readI64(); + struct.setReadaheadThresholdIsSet(true); + } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff95c714/core/src/main/thrift/tabletserver.thrift ---------------------------------------------------------------------- diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift index 4f9f13a..25e0b10 100644 --- a/core/src/main/thrift/tabletserver.thrift +++ b/core/src/main/thrift/tabletserver.thrift @@ -139,7 +139,8 @@ service TabletClientService extends client.ClientService { 7:map<string, map<string, string>> ssio, 8:list<binary> authorizations 9:bool waitForWrites, - 10:bool isolated) throws (1:client.ThriftSecurityException sec, 2:NotServingTabletException nste, 3:TooManyFilesException tmfe), + 10:bool isolated, + 12:i64 readaheadThreshold) throws (1:client.ThriftSecurityException sec, 2:NotServingTabletException nste, 3:TooManyFilesException tmfe), data.ScanResult continueScan(2:trace.TInfo tinfo, 1:data.ScanID scanID) throws (1:NoSuchScanIDException nssi, 2:NotServingTabletException nste, 3:TooManyFilesException tmfe), oneway void closeScan(2:trace.TInfo tinfo, 1:data.ScanID scanID), http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff95c714/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java index abb8750..56f03af 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java @@ -795,6 +795,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu public volatile ScanTask<ScanBatch> nextBatchTask; public AtomicBoolean interruptFlag; public Scanner scanner; + public long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD; @Override public void cleanup() { @@ -1156,9 +1157,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu @Override public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List<TColumn> columns, int batchSize, - List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated) - throws NotServingTabletException, ThriftSecurityException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException { - + List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, + long readaheadThreshold) throws NotServingTabletException, ThriftSecurityException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException { + Authorizations userauths = null; if (!security.canScan(credentials, new String(textent.getTable()), range, columns, ssiList, ssio, authorizations)) throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); @@ -1195,6 +1196,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu scanSession.ssio = ssio; scanSession.auths = new Authorizations(authorizations); scanSession.interruptFlag = new AtomicBoolean(); + scanSession.readaheadThreshold = readaheadThreshold; for (TColumn tcolumn : columns) { scanSession.columnSet.add(new Column(tcolumn)); @@ -1277,7 +1279,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu scanSession.batchCount++; - if (scanResult.more && scanSession.batchCount > 3) { + if (scanResult.more && scanSession.batchCount > scanSession.readaheadThreshold) { // start reading next batch while current batch is transmitted // to client scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag); http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff95c714/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java index 9bb7604..f4eb234 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java @@ -138,7 +138,7 @@ public class NullTserver { @Override public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent extent, TRange range, List<TColumn> columns, int batchSize, - List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated) { + List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold) { return null; }