This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 3787f1b49d Pauses bulk imports into tablets w/ too many files (#5104) 3787f1b49d is described below commit 3787f1b49dfe21e73257443708477b2d6c59286c Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Dec 6 16:24:25 2024 -0500 Pauses bulk imports into tablets w/ too many files (#5104) Bulk imports can add files to a tablet faster than compactions can shrink the number of files. There are many scenarios that can cause this. The following are some of the situations that could cause this. * Compactors are all busy when new bulk imports arrive. * Many processes bulk import a few files to a single tablet at around the same time * A single process bulk imports a lot of files to a single tablet When a tablet has too many files it can eventually cause cascading problems for compaction and scan. The change adds two properties to help avoid this problem. The first property `table.file.pause`. The behavior of this property is to pause bulk imports, and eventually minor compactions, when a tablets current file counts exceeds this property. The default is unlimited and therefore the default will never pause. The second property is `table.bulk.max.tablet.files`. This property determines the maximum number of files a bulk import can add to a single tablet. When this limit is exceeded the bulk import operation will fail w/o making changes to any tablets. Below is an example of how these properties behave. 1. Set table.file.pause=30 2. Set table.bulk.max.tablet.files=100 3. Import 20 files into tablet A, this causes tablet A to have 20 files 4. Import 20 files into tablet A, this causes tablet A to have 40 files 5. Import 20 files into tablet A. Because the tablet currently has 40 files and the pause limit is 30, this bulk import will pause. 6. Tablet A compacts 10 files, this causes tablet A to have 31 files. It is still above the pause limit so the bulk import does not progress. 7. Tablet A compacts 10 files, this causes tablet A to have 22 files. 8. The paused bulk import proceeds, this causes tablet A to have 42 files. 9. Import 200 files into tablet B and one file in tablet A. This operation fails w/o changing tablet A or B because 200 exceeds the value of table.bulk.max.tablet.files. While making this change ran into two preexisting problems. One was with bulk import setting time. For the case of multiple files the behavior of setting time was incorrect and inconsistent depending on the table time type and if the tablet was hosted or not. Made the behavior consistent for hosted or unhosted and the two table time types. The behavior is that single time stamp is allocated for all files in all cases. The code used to allocate different number of timestamps in the four different cases. This behavior was causing tablet refresh to fail and these changes to fail. Fixed this existing issue since progress could not be made on these changes without fixing it. The new test in this PR that add lots of files to a single tablet and set request bulk import to set time uncovered the existing problem. The second problem was the existing code had handling for the case of a subset of files being added to a tablet by bulk import. This should never happen because files are added via a mutation. Expect either the entire mutation to go through or nothing. Removed this handling for a subset and changed the code to throw an exception if a subset other than the empty set is seen. This change greatly simplified implementing this feature. fixes #5023 --- .../org/apache/accumulo/core/conf/Property.java | 44 ++-- .../thrift/TabletServerClientService.java | 132 ++---------- core/src/main/thrift/tabletserver.thrift | 1 - .../manager/tableOps/bulkVer2/LoadFiles.java | 121 +++++++---- .../manager/tableOps/bulkVer2/PrepBulkImport.java | 22 +- .../manager/tableOps/bulkVer2/TabletRefresher.java | 1 - .../tableOps/bulkVer2/PrepBulkImportTest.java | 2 +- .../accumulo/tserver/TabletClientHandler.java | 5 +- .../org/apache/accumulo/tserver/tablet/Tablet.java | 19 +- .../apache/accumulo/test/functional/BulkNewIT.java | 231 ++++++++++++++++++++- .../accumulo/test/performance/NullTserver.java | 2 +- 11 files changed, 375 insertions(+), 205 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index b9b7f72181..f330249998 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -921,6 +921,15 @@ public enum Property { "The maximum amount of memory that will be used to cache results of a client query/scan. " + "Once this limit is reached, the buffered data is sent to the client.", "1.3.5"), + TABLE_BULK_MAX_TABLETS("table.bulk.max.tablets", "0", PropertyType.COUNT, + "The maximum number of tablets allowed for one bulk import file. Value of 0 is Unlimited. " + + "This property is only enforced in the new bulk import API.", + "2.1.0"), + TABLE_BULK_MAX_TABLET_FILES("table.bulk.max.tablet.files", "0", PropertyType.COUNT, + "The maximum number of files a bulk import can add to a single tablet. When this property " + + "is exceeded for any tablet the entire bulk import operation will fail before any making " + + "changes. Value of 0 is unlimited.", + "4.0.0"), TABLE_FILE_TYPE("table.file.type", RFile.EXTENSION, PropertyType.FILENAME_EXT, "Change the type of file a table writes.", "1.3.5"), TABLE_LOAD_BALANCER("table.balancer", "org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer", @@ -947,17 +956,28 @@ public enum Property { + " defaults are used.", "1.3.5"), TABLE_FILE_MAX("table.file.max", "15", PropertyType.COUNT, - "The maximum number of RFiles each tablet in a table can have. When" - + " adjusting this property you may want to consider adjusting" - + " table.compaction.major.ratio also. Setting this property to 0 will make" - + " it default to tserver.scan.files.open.max-1, this will prevent a tablet" - + " from having more RFiles than can be opened. Prior to 2.1.0 this property" - + " was used to trigger merging minor compactions, but merging minor compactions" - + " were removed in 2.1.0. Now this property is only used by the" - + " DefaultCompactionStrategy and the RatioBasedCompactionPlanner." - + " The RatioBasedCompactionPlanner started using this property in 2.1.3, before" - + " that it did not use the property.", + "This property is used to signal to the compaction planner that it should be more " + + "aggressive for compacting tablets that exceed this limit. The " + + "RatioBasedCompactionPlanner will lower the compaction ratio and increase the " + + "priority for tablets that exceed this limit. When adjusting this property you may " + + "want to consider adjusting table.compaction.major.ratio also. Setting this property " + + "to 0 will make it default to tserver.scan.files.open.max-1, this will prevent a tablet" + + " from having more RFiles than can be opened by a scan.", "1.4.0"), + TABLE_FILE_PAUSE("table.file.pause", "0", PropertyType.COUNT, + "When a tablet has more than this number of files, bulk imports and minor compactions " + + "will wait until the tablet has less files before proceeding. This will cause back " + + "pressure on bulk imports and writes to tables when compactions are not keeping up. " + + "Only the number of files a tablet currently has is considered for pausing, the " + + "number of files a bulk import will add is not considered. This means a bulk import " + + "can surge above this limit once causing future bulk imports or minor compactions to " + + "pause until compactions can catch up. This property plus " + + TABLE_BULK_MAX_TABLET_FILES.getKey() + + " determines the total number of files a tablet could temporarily surge to based on bulk " + + "imports. Ideally this property would be set higher than " + TABLE_FILE_MAX.getKey() + + " so that compactions are more aggressive prior to reaching the pause point. Value of 0 is " + + "unlimited.", + "4.0.0"), TABLE_MERGE_FILE_MAX("table.merge.file.max", "10000", PropertyType.COUNT, "The maximum number of files that a merge operation will process. Before " + "merging a sum of the number of files in the merge range is computed and if it " @@ -994,10 +1014,6 @@ public enum Property { "1.3.5"), TABLE_BLOOM_HASHTYPE("table.bloom.hash.type", "murmur", PropertyType.STRING, "The bloom filter hash type.", "1.3.5"), - TABLE_BULK_MAX_TABLETS("table.bulk.max.tablets", "0", PropertyType.COUNT, - "The maximum number of tablets allowed for one bulk import file. Value of 0 is Unlimited. " - + "This property is only enforced in the new bulk import API.", - "2.1.0"), TABLE_DURABILITY("table.durability", "sync", PropertyType.DURABILITY, "The durability used to write to the write-ahead log. Legal values are:" + " none, which skips the write-ahead log; log, which sends the data to the" diff --git a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TabletServerClientService.java b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TabletServerClientService.java index 9eb4e3ea7e..9c66164840 100644 --- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TabletServerClientService.java +++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TabletServerClientService.java @@ -57,7 +57,7 @@ public class TabletServerClientService { public java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> refreshTablets(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tabletsToRefresh) throws org.apache.thrift.TException; - public java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long> allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets, int numStamps) throws org.apache.thrift.TException; + public java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long> allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets) throws org.apache.thrift.TException; } @@ -91,7 +91,7 @@ public class TabletServerClientService { public void refreshTablets(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tabletsToRefresh, org.apache.thrift.async.AsyncMethodCallback<java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent>> resultHandler) throws org.apache.thrift.TException; - public void allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets, int numStamps, org.apache.thrift.async.AsyncMethodCallback<java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long>> resultHandler) throws org.apache.thrift.TException; + public void allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets, org.apache.thrift.async.AsyncMethodCallback<java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long>> resultHandler) throws org.apache.thrift.TException; } @@ -477,19 +477,18 @@ public class TabletServerClientService { } @Override - public java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long> allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets, int numStamps) throws org.apache.thrift.TException + public java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long> allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets) throws org.apache.thrift.TException { - send_allocateTimestamps(tinfo, credentials, tablets, numStamps); + send_allocateTimestamps(tinfo, credentials, tablets); return recv_allocateTimestamps(); } - public void send_allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets, int numStamps) throws org.apache.thrift.TException + public void send_allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets) throws org.apache.thrift.TException { allocateTimestamps_args args = new allocateTimestamps_args(); args.setTinfo(tinfo); args.setCredentials(credentials); args.setTablets(tablets); - args.setNumStamps(numStamps); sendBase("allocateTimestamps", args); } @@ -1101,9 +1100,9 @@ public class TabletServerClientService { } @Override - public void allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets, int numStamps, org.apache.thrift.async.AsyncMethodCallback<java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long>> resultHandler) throws org.apache.thrift.TException { + public void allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets, org.apache.thrift.async.AsyncMethodCallback<java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long>> resultHandler) throws org.apache.thrift.TException { checkReady(); - allocateTimestamps_call method_call = new allocateTimestamps_call(tinfo, credentials, tablets, numStamps, resultHandler, this, ___protocolFactory, ___transport); + allocateTimestamps_call method_call = new allocateTimestamps_call(tinfo, credentials, tablets, resultHandler, this, ___protocolFactory, ___transport); this.___currentMethod = method_call; ___manager.call(method_call); } @@ -1112,13 +1111,11 @@ public class TabletServerClientService { private org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo; private org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials; private java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets; - private int numStamps; - public allocateTimestamps_call(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets, int numStamps, org.apache.thrift.async.AsyncMethodCallback<java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long>> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory proto [...] + public allocateTimestamps_call(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets, org.apache.thrift.async.AsyncMethodCallback<java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long>> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org [...] super(client, protocolFactory, transport, resultHandler, false); this.tinfo = tinfo; this.credentials = credentials; this.tablets = tablets; - this.numStamps = numStamps; } @Override @@ -1128,7 +1125,6 @@ public class TabletServerClientService { args.setTinfo(tinfo); args.setCredentials(credentials); args.setTablets(tablets); - args.setNumStamps(numStamps); args.write(prot); prot.writeMessageEnd(); } @@ -1625,7 +1621,7 @@ public class TabletServerClientService { @Override public allocateTimestamps_result getResult(I iface, allocateTimestamps_args args) throws org.apache.thrift.TException { allocateTimestamps_result result = new allocateTimestamps_result(); - result.success = iface.allocateTimestamps(args.tinfo, args.credentials, args.tablets, args.numStamps); + result.success = iface.allocateTimestamps(args.tinfo, args.credentials, args.tablets); return result; } } @@ -2620,7 +2616,7 @@ public class TabletServerClientService { @Override public void start(I iface, allocateTimestamps_args args, org.apache.thrift.async.AsyncMethodCallback<java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long>> resultHandler) throws org.apache.thrift.TException { - iface.allocateTimestamps(args.tinfo, args.credentials, args.tablets, args.numStamps,resultHandler); + iface.allocateTimestamps(args.tinfo, args.credentials, args.tablets,resultHandler); } } @@ -16715,7 +16711,6 @@ public class TabletServerClientService { private static final org.apache.thrift.protocol.TField TINFO_FIELD_DESC = new org.apache.thrift.protocol.TField("tinfo", org.apache.thrift.protocol.TType.STRUCT, (short)1); private static final org.apache.thrift.protocol.TField CREDENTIALS_FIELD_DESC = new org.apache.thrift.protocol.TField("credentials", org.apache.thrift.protocol.TType.STRUCT, (short)2); private static final org.apache.thrift.protocol.TField TABLETS_FIELD_DESC = new org.apache.thrift.protocol.TField("tablets", org.apache.thrift.protocol.TType.LIST, (short)3); - private static final org.apache.thrift.protocol.TField NUM_STAMPS_FIELD_DESC = new org.apache.thrift.protocol.TField("numStamps", org.apache.thrift.protocol.TType.I32, (short)4); private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new allocateTimestamps_argsStandardSchemeFactory(); private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new allocateTimestamps_argsTupleSchemeFactory(); @@ -16723,14 +16718,12 @@ public class TabletServerClientService { public @org.apache.thrift.annotation.Nullable org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo; // required public @org.apache.thrift.annotation.Nullable org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials; // required public @org.apache.thrift.annotation.Nullable java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets; // required - public int numStamps; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { TINFO((short)1, "tinfo"), CREDENTIALS((short)2, "credentials"), - TABLETS((short)3, "tablets"), - NUM_STAMPS((short)4, "numStamps"); + TABLETS((short)3, "tablets"); private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); @@ -16752,8 +16745,6 @@ public class TabletServerClientService { return CREDENTIALS; case 3: // TABLETS return TABLETS; - case 4: // NUM_STAMPS - return NUM_STAMPS; default: return null; } @@ -16797,8 +16788,6 @@ public class TabletServerClientService { } // isset id assignments - private static final int __NUMSTAMPS_ISSET_ID = 0; - private byte __isset_bitfield = 0; public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -16809,8 +16798,6 @@ public class TabletServerClientService { tmpMap.put(_Fields.TABLETS, new org.apache.thrift.meta_data.FieldMetaData("tablets", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent.class)))); - tmpMap.put(_Fields.NUM_STAMPS, new org.apache.thrift.meta_data.FieldMetaData("numStamps", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(allocateTimestamps_args.class, metaDataMap); } @@ -16821,22 +16808,18 @@ public class TabletServerClientService { public allocateTimestamps_args( org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, - java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets, - int numStamps) + java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets) { this(); this.tinfo = tinfo; this.credentials = credentials; this.tablets = tablets; - this.numStamps = numStamps; - setNumStampsIsSet(true); } /** * Performs a deep copy on <i>other</i>. */ public allocateTimestamps_args(allocateTimestamps_args other) { - __isset_bitfield = other.__isset_bitfield; if (other.isSetTinfo()) { this.tinfo = new org.apache.accumulo.core.clientImpl.thrift.TInfo(other.tinfo); } @@ -16850,7 +16833,6 @@ public class TabletServerClientService { } this.tablets = __this__tablets; } - this.numStamps = other.numStamps; } @Override @@ -16863,8 +16845,6 @@ public class TabletServerClientService { this.tinfo = null; this.credentials = null; this.tablets = null; - setNumStampsIsSet(false); - this.numStamps = 0; } @org.apache.thrift.annotation.Nullable @@ -16958,29 +16938,6 @@ public class TabletServerClientService { } } - public int getNumStamps() { - return this.numStamps; - } - - public allocateTimestamps_args setNumStamps(int numStamps) { - this.numStamps = numStamps; - setNumStampsIsSet(true); - return this; - } - - public void unsetNumStamps() { - __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __NUMSTAMPS_ISSET_ID); - } - - /** Returns true if field numStamps is set (has been assigned a value) and false otherwise */ - public boolean isSetNumStamps() { - return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __NUMSTAMPS_ISSET_ID); - } - - public void setNumStampsIsSet(boolean value) { - __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __NUMSTAMPS_ISSET_ID, value); - } - @Override public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { @@ -17008,14 +16965,6 @@ public class TabletServerClientService { } break; - case NUM_STAMPS: - if (value == null) { - unsetNumStamps(); - } else { - setNumStamps((java.lang.Integer)value); - } - break; - } } @@ -17032,9 +16981,6 @@ public class TabletServerClientService { case TABLETS: return getTablets(); - case NUM_STAMPS: - return getNumStamps(); - } throw new java.lang.IllegalStateException(); } @@ -17053,8 +16999,6 @@ public class TabletServerClientService { return isSetCredentials(); case TABLETS: return isSetTablets(); - case NUM_STAMPS: - return isSetNumStamps(); } throw new java.lang.IllegalStateException(); } @@ -17099,15 +17043,6 @@ public class TabletServerClientService { return false; } - boolean this_present_numStamps = true; - boolean that_present_numStamps = true; - if (this_present_numStamps || that_present_numStamps) { - if (!(this_present_numStamps && that_present_numStamps)) - return false; - if (this.numStamps != that.numStamps) - return false; - } - return true; } @@ -17127,8 +17062,6 @@ public class TabletServerClientService { if (isSetTablets()) hashCode = hashCode * 8191 + tablets.hashCode(); - hashCode = hashCode * 8191 + numStamps; - return hashCode; } @@ -17170,16 +17103,6 @@ public class TabletServerClientService { return lastComparison; } } - lastComparison = java.lang.Boolean.compare(isSetNumStamps(), other.isSetNumStamps()); - if (lastComparison != 0) { - return lastComparison; - } - if (isSetNumStamps()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.numStamps, other.numStamps); - if (lastComparison != 0) { - return lastComparison; - } - } return 0; } @@ -17227,10 +17150,6 @@ public class TabletServerClientService { sb.append(this.tablets); } first = false; - if (!first) sb.append(", "); - sb.append("numStamps:"); - sb.append(this.numStamps); - first = false; sb.append(")"); return sb.toString(); } @@ -17256,8 +17175,6 @@ public class TabletServerClientService { private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { try { - // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. - __isset_bitfield = 0; read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); @@ -17321,14 +17238,6 @@ public class TabletServerClientService { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 4: // NUM_STAMPS - if (schemeField.type == org.apache.thrift.protocol.TType.I32) { - struct.numStamps = iprot.readI32(); - struct.setNumStampsIsSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -17367,9 +17276,6 @@ public class TabletServerClientService { } oprot.writeFieldEnd(); } - oprot.writeFieldBegin(NUM_STAMPS_FIELD_DESC); - oprot.writeI32(struct.numStamps); - oprot.writeFieldEnd(); oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -17398,10 +17304,7 @@ public class TabletServerClientService { if (struct.isSetTablets()) { optionals.set(2); } - if (struct.isSetNumStamps()) { - optionals.set(3); - } - oprot.writeBitSet(optionals, 4); + oprot.writeBitSet(optionals, 3); if (struct.isSetTinfo()) { struct.tinfo.write(oprot); } @@ -17417,15 +17320,12 @@ public class TabletServerClientService { } } } - if (struct.isSetNumStamps()) { - oprot.writeI32(struct.numStamps); - } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, allocateTimestamps_args struct) throws org.apache.thrift.TException { org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet incoming = iprot.readBitSet(4); + java.util.BitSet incoming = iprot.readBitSet(3); if (incoming.get(0)) { struct.tinfo = new org.apache.accumulo.core.clientImpl.thrift.TInfo(); struct.tinfo.read(iprot); @@ -17450,10 +17350,6 @@ public class TabletServerClientService { } struct.setTabletsIsSet(true); } - if (incoming.get(3)) { - struct.numStamps = iprot.readI32(); - struct.setNumStampsIsSet(true); - } } } diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift index bcf85d32be..1069445ac7 100644 --- a/core/src/main/thrift/tabletserver.thrift +++ b/core/src/main/thrift/tabletserver.thrift @@ -243,7 +243,6 @@ service TabletServerClientService { 1:client.TInfo tinfo 2:security.TCredentials credentials 3:list<data.TKeyExtent> tablets - 4:i32 numStamps ) } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index 711387955d..62eec1ebe1 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.manager.tableOps.bulkVer2; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; @@ -25,6 +26,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType.CURRENT; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; @@ -54,6 +56,7 @@ import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; @@ -119,8 +122,10 @@ class LoadFiles extends ManagerRepo { private Map<KeyExtent,List<TabletFile>> loadingFiles; private long skipped = 0; + private long pauseLimit; - void start(Path bulkDir, Manager manager, FateId fateId, boolean setTime) throws Exception { + void start(Path bulkDir, Manager manager, TableId tableId, FateId fateId, boolean setTime) + throws Exception { this.bulkDir = bulkDir; this.manager = manager; this.fateId = fateId; @@ -128,6 +133,8 @@ class LoadFiles extends ManagerRepo { conditionalMutator = manager.getContext().getAmple().conditionallyMutateTablets(); this.skipped = 0; this.loadingFiles = new HashMap<>(); + this.pauseLimit = + manager.getContext().getTableConfiguration(tableId).getCount(Property.TABLE_FILE_PAUSE); } void load(List<TabletMetadata> tablets, Files files) { @@ -141,7 +148,18 @@ class LoadFiles extends ManagerRepo { tablets = tablets.stream().filter(tabletMeta -> { Set<ReferencedTabletFile> loaded = tabletMeta.getLoaded().keySet().stream() .map(StoredTabletFile::getTabletFile).collect(Collectors.toSet()); - return !loaded.containsAll(toLoad.keySet()); + boolean containsAll = loaded.containsAll(toLoad.keySet()); + // The tablet should either contain all loaded files or none. It should never contain a + // subset. Loaded files are written in single mutation to accumulo, either all changes in a + // mutation should go through or none. + Preconditions.checkState(containsAll || Collections.disjoint(loaded, toLoad.keySet()), + "Tablet %s has a subset of loaded files %s %s", tabletMeta.getExtent(), loaded, + toLoad.keySet()); + if (containsAll) { + log.trace("{} tablet {} has already loaded all files, nothing to do", fateId, + tabletMeta.getExtent()); + } + return !containsAll; }).collect(Collectors.toList()); // timestamps from tablets that are hosted on a tablet server @@ -155,7 +173,19 @@ class LoadFiles extends ManagerRepo { hostedTimestamps = Map.of(); } + List<ColumnType> rsc = new ArrayList<>(); + rsc.add(LOCATION); + if (setTime) { + rsc.add(TIME); + } + if (pauseLimit > 0) { + rsc.add(FILES); + } + + ColumnType[] requireSameCols = rsc.toArray(new ColumnType[0]); + for (TabletMetadata tablet : tablets) { + // Skip any tablets at the beginning of the loop before any work is done. if (setTime && tablet.getLocation() != null && !hostedTimestamps.containsKey(tablet.getExtent())) { skipped++; @@ -163,12 +193,27 @@ class LoadFiles extends ManagerRepo { tablet.getExtent()); continue; } + if (pauseLimit > 0 && tablet.getFiles().size() > pauseLimit) { + skipped++; + log.debug( + "{} tablet {} has {} files which exceeds the pause limit of {}, not bulk importing and will retry later", + fateId, tablet.getExtent(), tablet.getFiles().size(), pauseLimit); + continue; + } Map<ReferencedTabletFile,DataFileValue> filesToLoad = new HashMap<>(); var tabletTime = TabletTime.getInstance(tablet.getTime()); - int timeOffset = 0; + Long fileTime = null; + if (setTime) { + if (tablet.getLocation() == null) { + fileTime = tabletTime.getAndUpdateTime(); + } else { + fileTime = hostedTimestamps.get(tablet.getExtent()); + tabletTime.updateTimeIfGreater(fileTime); + } + } for (var entry : toLoad.entrySet()) { ReferencedTabletFile refTabFile = entry.getKey(); @@ -177,55 +222,35 @@ class LoadFiles extends ManagerRepo { DataFileValue dfv; if (setTime) { - if (tablet.getLocation() == null) { - dfv = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries(), - tabletTime.getAndUpdateTime()); - } else { - long fileTime = hostedTimestamps.get(tablet.getExtent()) + timeOffset; - dfv = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries(), - fileTime); - tabletTime.updateTimeIfGreater(fileTime); - timeOffset++; - } + // This should always be set outside the loop when setTime is true and should not be + // null at this point + Preconditions.checkState(fileTime != null); + dfv = + new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries(), fileTime); } else { dfv = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries()); } filesToLoad.put(refTabFile, dfv); - } - // remove any files that were already loaded - tablet.getLoaded().keySet().forEach(stf -> { - filesToLoad.keySet().remove(stf.getTabletFile()); - }); + var tabletMutator = conditionalMutator.mutateTablet(tablet.getExtent()) + .requireAbsentOperation().requireSame(tablet, LOADED, requireSameCols); - if (!filesToLoad.isEmpty()) { - var tabletMutator = - conditionalMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation(); - - if (setTime) { - tabletMutator.requireSame(tablet, LOADED, TIME, LOCATION); - } else { - tabletMutator.requireSame(tablet, LOADED, LOCATION); - } - - filesToLoad.forEach((f, v) -> { - tabletMutator.putBulkFile(f, fateId); - tabletMutator.putFile(f, v); - }); + filesToLoad.forEach((f, v) -> { + tabletMutator.putBulkFile(f, fateId); + tabletMutator.putFile(f, v); + }); - if (setTime) { - tabletMutator.putTime(tabletTime.getMetadataTime()); - } + if (setTime) { + tabletMutator.putTime(tabletTime.getMetadataTime()); + } - // Hang on to for logging purposes in the case where the update is a - // success. - Preconditions.checkState( - loadingFiles.put(tablet.getExtent(), List.copyOf(filesToLoad.keySet())) == null); + // Hang on to loaded files for logging purposes in the case where the update is success. + Preconditions.checkState( + loadingFiles.put(tablet.getExtent(), List.copyOf(filesToLoad.keySet())) == null); - tabletMutator.submit(tm -> false); - } + tabletMutator.submit(tm -> false); } } @@ -268,8 +293,8 @@ class LoadFiles extends ManagerRepo { client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server, context, timeInMillis); - var timestamps = client.allocateTimestamps(TraceUtil.traceInfo(), context.rpcCreds(), - extents, numStamps); + var timestamps = + client.allocateTimestamps(TraceUtil.traceInfo(), context.rpcCreds(), extents); log.trace("{} allocate timestamps request to {} returned {} timestamps", fateId, server, timestamps.size()); @@ -333,10 +358,16 @@ class LoadFiles extends ManagerRepo { Loader loader = new Loader(); long t1; - loader.start(bulkDir, manager, fateId, bulkInfo.setTime); + loader.start(bulkDir, manager, tableId, fateId, bulkInfo.setTime); + + List<ColumnType> fetchCols = new ArrayList<>(List.of(PREV_ROW, LOCATION, LOADED, TIME)); + if (loader.pauseLimit > 0) { + fetchCols.add(FILES); + } + try (TabletsMetadata tabletsMetadata = TabletsMetadata.builder(manager.getContext()).forTable(tableId).overlapping(startRow, null) - .checkConsistency().fetch(PREV_ROW, LOCATION, LOADED, TIME).build()) { + .checkConsistency().fetch(fetchCols.toArray(new ColumnType[0])).build()) { // The tablet iterator and load mapping iterator are both iterating over data that is sorted // in the same way. The two iterators are each independently advanced to find common points in diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java index c2ae6f9cd7..b2d3521735 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java @@ -35,6 +35,7 @@ import java.util.function.Function; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; +import org.apache.accumulo.core.clientImpl.bulk.Bulk; import org.apache.accumulo.core.clientImpl.bulk.BulkImport; import org.apache.accumulo.core.clientImpl.bulk.BulkSerialize; import org.apache.accumulo.core.clientImpl.bulk.LoadMappingIterator; @@ -116,8 +117,10 @@ public class PrepBulkImport extends ManagerRepo { */ @VisibleForTesting static KeyExtent validateLoadMapping(String tableId, LoadMappingIterator lmi, - TabletIterFactory tabletIterFactory, int maxNumTablets) throws Exception { + TabletIterFactory tabletIterFactory, int maxNumTablets, int maxFilesPerTablet) + throws Exception { var currRange = lmi.next(); + checkFilesPerTablet(tableId, maxFilesPerTablet, currRange); Text startRow = currRange.getKey().prevEndRow(); @@ -143,6 +146,7 @@ public class PrepBulkImport extends ManagerRepo { break; } currRange = lmi.next(); + checkFilesPerTablet(tableId, maxFilesPerTablet, currRange); lastTablet = currRange.getKey(); } @@ -194,6 +198,17 @@ public class PrepBulkImport extends ManagerRepo { return new KeyExtent(firstTablet.tableId(), lastTablet.endRow(), firstTablet.prevEndRow()); } + private static void checkFilesPerTablet(String tableId, int maxFilesPerTablet, + Map.Entry<KeyExtent,Bulk.Files> currRange) throws AcceptableThriftTableOperationException { + if (maxFilesPerTablet > 0 && currRange.getValue().getSize() > maxFilesPerTablet) { + throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, + TableOperationExceptionType.OTHER, + "Attempted to import " + currRange.getValue().getSize() + + " files into a single tablet which exceeds the configured max of " + + maxFilesPerTablet); + } + } + private static class TabletIterFactoryImpl implements TabletIterFactory { private final List<AutoCloseable> resourcesToClose = new ArrayList<>(); private final Manager manager; @@ -228,12 +243,15 @@ public class PrepBulkImport extends ManagerRepo { int maxTablets = manager.getContext().getTableConfiguration(bulkInfo.tableId) .getCount(Property.TABLE_BULK_MAX_TABLETS); + int maxFilesPerTablet = manager.getContext().getTableConfiguration(bulkInfo.tableId) + .getCount(Property.TABLE_BULK_MAX_TABLET_FILES); try ( LoadMappingIterator lmi = BulkSerialize.readLoadMapping(bulkDir.toString(), bulkInfo.tableId, fs::open); TabletIterFactory tabletIterFactory = new TabletIterFactoryImpl(manager, bulkInfo)) { - return validateLoadMapping(bulkInfo.tableId.canonical(), lmi, tabletIterFactory, maxTablets); + return validateLoadMapping(bulkInfo.tableId.canonical(), lmi, tabletIterFactory, maxTablets, + maxFilesPerTablet); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java index 8bff563391..339857ec20 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java @@ -72,7 +72,6 @@ public class TabletRefresher { // request. There may also be tablets that had a location when the files were set but do not // have a location now, that is ok the next time that tablet loads somewhere it will see the // files. - var tabletIterator = tablets.stream().filter(tabletMetadata -> tabletMetadata.getLocation() != null) .filter(needsRefresh).iterator(); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java index 6c34c6aad3..1872123797 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java @@ -129,7 +129,7 @@ public class PrepBulkImportTest { .map(Text::toString).orElse(null); try (LoadMappingIterator lmi = createLoadMappingIter(loadRanges)) { - var extent = PrepBulkImport.validateLoadMapping("1", lmi, tabletIterFactory, maxTablets); + var extent = PrepBulkImport.validateLoadMapping("1", lmi, tabletIterFactory, maxTablets, 0); assertEquals(nke(minPrevEndRow, maxPrevEndRow), extent, loadRanges + " " + tabletRanges); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index a815af72c3..0b5c246647 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -1181,7 +1181,7 @@ public class TabletClientHandler implements TabletServerClientService.Iface, @Override public Map<TKeyExtent,Long> allocateTimestamps(TInfo tinfo, TCredentials credentials, - List<TKeyExtent> extents, int numStamps) throws TException { + List<TKeyExtent> extents) throws TException { if (!security.canPerformSystemActions(credentials)) { throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException(); @@ -1195,8 +1195,7 @@ public class TabletClientHandler implements TabletServerClientService.Iface, var extent = KeyExtent.fromThrift(textent); Tablet tablet = tabletsSnapshot.get(extent); if (tablet != null) { - tablet.allocateTimestamp(numStamps) - .ifPresent(timestamp -> timestamps.put(textent, timestamp)); + tablet.allocateTimestamp().ifPresent(timestamp -> timestamps.put(textent, timestamp)); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 23e1e790f2..a022910cd7 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -1689,8 +1689,8 @@ public class Tablet extends TabletBase { // Its expected that what is persisted should be less than equal to the time that tablet has // in memory. Preconditions.checkState(tabletMetadata.getTime().getTime() <= tabletTime.getTime(), - "Time in metadata is ahead of tablet %s memory:%s metadata:%s", extent, tabletTime, - tabletMetadata.getTime()); + "Time in metadata is ahead of tablet %s memory:%s metadata:%s", extent, + tabletTime.getTime(), tabletMetadata.getTime()); // must update latestMetadata before computeNumEntries() is called Preconditions.checkState( @@ -1766,19 +1766,12 @@ public class Tablet extends TabletBase { return !activeScans.isEmpty() || writesInProgress > 0; } - public synchronized OptionalLong allocateTimestamp(int numStamps) { + public synchronized OptionalLong allocateTimestamp() { if (isClosing() || isClosed()) { return OptionalLong.empty(); } - - Preconditions.checkArgument(numStamps > 0); - long timestamp = Long.MIN_VALUE; - for (int i = 0; i < numStamps; i++) { - timestamp = tabletTime.getAndUpdateTime(); - } - - getTabletMemory().getCommitSession().updateMaxCommittedTime(timestamp); - - return OptionalLong.of(timestamp); + var time = tabletTime.getAndUpdateTime(); + getTabletMemory().getCommitSession().updateMaxCommittedTime(time); + return OptionalLong.of(time); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java index 1a3439919a..d2fa75ff26 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java @@ -23,6 +23,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -70,6 +71,7 @@ import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.client.admin.TabletInformation; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; @@ -94,6 +96,7 @@ import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory; +import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.MemoryUnit; @@ -116,6 +119,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.google.common.collect.MoreCollectors; import com.google.common.util.concurrent.MoreExecutors; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -283,7 +287,7 @@ public class BulkNewIT extends SharedMiniClusterBase { writeData(dir + "/f1.", aconf, 0, 332); // For this import tablet should be hosted so the bulk import operation will have to - // coordinate getting time with the hosted tablet. The time should refect the batch writes + // coordinate getting time with the hosted tablet. The time should reflect the batch writes // just done. client.tableOperations().importDirectory(dir).to(tableName).tableTime(true).load(); @@ -348,6 +352,187 @@ public class BulkNewIT extends SharedMiniClusterBase { } } + @Test + public void testPause() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + tableName = "testPause_table1"; + NewTableConfiguration newTableConf = new NewTableConfiguration(); + var props = + Map.of(Property.TABLE_FILE_PAUSE.getKey(), "5", Property.TABLE_MAJC_RATIO.getKey(), "20"); + newTableConf.setProperties(props); + client.tableOperations().create(tableName, newTableConf); + + addSplits(client, tableName, "0060 0120"); + String dir = getDir("/testPause1-"); + + for (int i = 0; i < 18; i++) { + writeData(dir + "/f" + i + ".", aconf, i * 10, (i + 1) * 10 - 1); + } + + client.tableOperations().importDirectory(dir).to(tableName).tableTime(true).load(); + verifyData(client, tableName, 0, 179, false); + + String dir2 = getDir("/testPause2-"); + + for (int i = 0; i < 18; i++) { + writeData(dir2 + "/f" + i + ".", aconf, i * 10, (i + 1) * 10 - 1, 1000); + } + + // Start a second bulk import in background thread because it is expected this bulk import + // will hang because tablets are over the pause file limit. + ExecutorService executor = Executors.newFixedThreadPool(1); + var future = executor.submit(() -> { + client.tableOperations().importDirectory(dir2).to(tableName).tableTime(true).load(); + return null; + }); + + // sleep a bit to give the bulk import a chance to run + UtilWaitThread.sleep(3000); + // bulk import should not have gone through it should be pausing because the tablet have too + // many files + assertFalse(future.isDone()); + verifyData(client, tableName, 0, 179, false); + + // Before the bulk import runs no tablets should have loaded flags set + assertEquals(Map.of("0060", 0, "0120", 0, "null", 0), countLoaded(client, tableName)); + // compacting the first tablet should allow the import on that tablet to proceed + client.tableOperations().compact(tableName, + new CompactionConfig().setWait(true).setEndRow(new Text("0060"))); + // Wait for the first tablets data to be updated by bulk import. + Wait.waitFor( + () -> Map.of("0060", 7, "0120", 0, "null", 0).equals(countLoaded(client, tableName))); + + // The bulk imports on the other tablets should not have gone through, verify their data was + // not updated. Spot check a few rows in the other two tablets. The first tablet may or may + // not be updated on the tablet server at this point, so can not look at its data. + assertEquals(61L, readRowValue(client, tableName, 61)); + assertEquals(100L, readRowValue(client, tableName, 100)); + assertEquals(140L, readRowValue(client, tableName, 140)); + + // compact the entire table, should allow all bulk imports to go through + client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + // wait for bulk import to complete + future.get(); + // verify the values were updated by the bulk import that was paused + verifyData(client, tableName, 0, 179, 1000, false); + assertEquals(Map.of("0060", 0, "0120", 0, "null", 0), countLoaded(client, tableName)); + } + } + + @Test + public void testMaxTabletsPerFile() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + tableName = "testMaxTabletsPerFile_table1"; + NewTableConfiguration newTableConf = new NewTableConfiguration(); + var props = Map.of(Property.TABLE_BULK_MAX_TABLET_FILES.getKey(), "5"); + newTableConf.setProperties(props); + client.tableOperations().create(tableName, newTableConf); + + String dir = getDir("/testBulkFileMFP-"); + + for (int i = 4; i < 8; i++) { + writeData(dir + "/f" + i + ".", aconf, i * 10, (i + 1) * 10 - 1); + } + + // should be able to bulk import 4 files w/o issue + client.tableOperations().importDirectory(dir).to(tableName).load(); + + verifyData(client, tableName, 40, 79, false); + + var dir2 = getDir("/testBulkFileMFP2-"); + for (int i = 12; i < 18; i++) { + writeData(dir2 + "/f" + i + ".", aconf, i * 10, (i + 1) * 10 - 1); + } + + var exception = assertThrows(AccumuloException.class, + () -> client.tableOperations().importDirectory(dir2).to(tableName).load()); + var msg = ((ThriftTableOperationException) exception.getCause()).getDescription(); + // message should contain the limit of 5 and the number of files attempted to import 6 + assertTrue(msg.contains(" 5"), msg); + assertTrue(msg.contains(" 6"), msg); + + // ensure no data was added to table + verifyData(client, tableName, 40, 79, false); + + // tested a table w/ single tablet, now test a table w/ three tablets and try importing into + // the first, middle, and last tablet + addSplits(client, tableName, "0100 0200"); + + // try the first tablet + var dir3 = getDir("/testBulkFileMFP3-"); + for (int i = 0; i < 7; i++) { + writeData(dir3 + "/f" + i + ".", aconf, i * 10, (i + 1) * 10 - 1); + } + // add single file for the last tablet, this does not exceed the limit however it should not + // go through + writeData(dir3 + "/f_last.", aconf, 300, 400); + exception = assertThrows(AccumuloException.class, + () -> client.tableOperations().importDirectory(dir3).to(tableName).load()); + // verify no files were moved by the failed bulk import + assertEquals(8, Arrays.stream( + getCluster().getFileSystem().listStatus(new Path(dir3), f -> f.getName().endsWith(".rf"))) + .count()); + msg = ((ThriftTableOperationException) exception.getCause()).getDescription(); + // message should contain the limit of 5 and the number of files attempted to import 7 + assertTrue(msg.contains(" 5"), msg); + assertTrue(msg.contains(" 7"), msg); + verifyData(client, tableName, 40, 79, false); + + // try the middle tablet + var dir4 = getDir("/testBulkFileMFP4-"); + for (int i = 11; i < 17; i++) { + writeData(dir4 + "/f" + i + ".", aconf, i * 10, (i + 1) * 10 - 1); + } + // add single file for the last tablet, this does not exceed the limit however it should not + // go through + writeData(dir4 + "/f_last.", aconf, 300, 400); + exception = assertThrows(AccumuloException.class, + () -> client.tableOperations().importDirectory(dir4).to(tableName).load()); + // verify no files were moved by the failed bulk import + assertEquals(7, Arrays.stream( + getCluster().getFileSystem().listStatus(new Path(dir4), f -> f.getName().endsWith(".rf"))) + .count()); + msg = ((ThriftTableOperationException) exception.getCause()).getDescription(); + // message should contain the limit of 5 and the number of files attempted to import 6 + assertTrue(msg.contains(" 5"), msg); + assertTrue(msg.contains(" 6"), msg); + verifyData(client, tableName, 40, 79, false); + + // try the last tablet + var dir5 = getDir("/testBulkFileMFP5-"); + for (int i = 21; i < 28; i++) { + writeData(dir5 + "/f" + i + ".", aconf, i * 10, (i + 1) * 10 - 1); + } + // add single file for the first tablet, this does not exceed the limit however it should not + // go through + writeData(dir5 + "/f_last.", aconf, 0, 10); + exception = assertThrows(AccumuloException.class, + () -> client.tableOperations().importDirectory(dir5).to(tableName).load()); + // verify no files were moved by the failed bulk import + assertEquals(8, Arrays.stream( + getCluster().getFileSystem().listStatus(new Path(dir5), f -> f.getName().endsWith(".rf"))) + .count()); + msg = ((ThriftTableOperationException) exception.getCause()).getDescription(); + // message should contain the limit of 5 and the number of files attempted to import 7 + assertTrue(msg.contains(" 5"), msg); + assertTrue(msg.contains(" 7"), msg); + verifyData(client, tableName, 40, 79, false); + + // test an import that has more files than the limit, but not in a single tablet so it should + // work + var dir6 = getDir("/testBulkFileMFP6-"); + for (int i = 8; i < 14; i++) { + writeData(dir6 + "/f" + i + ".", aconf, i * 10, (i + 1) * 10 - 1); + } + client.tableOperations().importDirectory(dir6).to(tableName).load(); + // verify the bulk import moved the files + assertEquals(0, Arrays.stream( + getCluster().getFileSystem().listStatus(new Path(dir6), f -> f.getName().endsWith(".rf"))) + .count()); + verifyData(client, tableName, 40, 139, false); + } + } + private void testSingleTabletSingleFileNoSplits(AccumuloClient c, boolean offline) throws Exception { if (offline) { @@ -926,8 +1111,32 @@ public class BulkNewIT extends SharedMiniClusterBase { client.tableOperations().addSplits(tableName, splits); } - private void verifyData(AccumuloClient client, String table, int start, int end, boolean setTime) - throws Exception { + private long readRowValue(AccumuloClient client, String table, int row) throws Exception { + try (var scanner = client.createScanner(table)) { + scanner.setRange(new Range(row(row))); + var value = scanner.stream().map(Entry::getValue).map(Value::toString) + .collect(MoreCollectors.onlyElement()); + return Long.parseLong(value); + } + } + + private Map<String,Integer> countLoaded(AccumuloClient client, String table) throws Exception { + var ctx = ((ClientContext) client); + var tableId = ctx.getTableId(table); + + try (var tabletsMetadata = ctx.getAmple().readTablets().forTable(tableId).build()) { + Map<String,Integer> counts = new HashMap<>(); + for (var tabletMetadata : tabletsMetadata) { + String endRow = + tabletMetadata.getEndRow() == null ? "null" : tabletMetadata.getEndRow().toString(); + counts.put(endRow, tabletMetadata.getLoaded().size()); + } + return counts; + } + } + + private void verifyData(AccumuloClient client, String table, int start, int end, int valueOffset, + boolean setTime) throws Exception { try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { Iterator<Entry<Key,Value>> iter = scanner.iterator(); @@ -945,7 +1154,7 @@ public class BulkNewIT extends SharedMiniClusterBase { throw new Exception("unexpected row " + entry.getKey() + " " + i); } - if (Integer.parseInt(entry.getValue().toString()) != i) { + if (Integer.parseInt(entry.getValue().toString()) != valueOffset + i) { throw new Exception("unexpected value " + entry + " " + i); } @@ -960,6 +1169,11 @@ public class BulkNewIT extends SharedMiniClusterBase { } } + private void verifyData(AccumuloClient client, String table, int start, int end, boolean setTime) + throws Exception { + verifyData(client, table, start, end, 0, setTime); + } + private void verifyMetadata(AccumuloClient client, String tableName, Map<String,Set<String>> expectedHashes) { @@ -1001,7 +1215,7 @@ public class BulkNewIT extends SharedMiniClusterBase { return String.format("%04d", r); } - private String writeData(String file, AccumuloConfiguration aconf, int s, int e) + private String writeData(String file, AccumuloConfiguration aconf, int s, int e, int valueOffset) throws Exception { FileSystem fs = getCluster().getFileSystem(); String filename = file + RFile.EXTENSION; @@ -1011,13 +1225,18 @@ public class BulkNewIT extends SharedMiniClusterBase { .withTableConfiguration(aconf).build()) { writer.startDefaultLocalityGroup(); for (int i = s; i <= e; i++) { - writer.append(new Key(new Text(row(i))), new Value(Integer.toString(i))); + writer.append(new Key(new Text(row(i))), new Value(Integer.toString(valueOffset + i))); } } return hash(filename); } + private String writeData(String file, AccumuloConfiguration aconf, int s, int e) + throws Exception { + return writeData(file, aconf, s, e, 0); + } + /** * This constraint is used to simulate an error in the metadata write for a bulk import. */ diff --git a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java index f15968e09f..9ddff41e0d 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java @@ -266,7 +266,7 @@ public class NullTserver { @Override public Map<TKeyExtent,Long> allocateTimestamps(TInfo tinfo, TCredentials credentials, - List<TKeyExtent> tablets, int numStamps) throws TException { + List<TKeyExtent> tablets) throws TException { return Map.of(); } }