This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 3787f1b49d Pauses bulk imports into tablets w/ too many files (#5104)
3787f1b49d is described below

commit 3787f1b49dfe21e73257443708477b2d6c59286c
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Dec 6 16:24:25 2024 -0500

    Pauses bulk imports into tablets w/ too many files (#5104)
    
    Bulk imports can add files to a tablet faster than compactions can
    shrink the number of files.  There are many scenarios that can cause
    this. The following are some of the situations that could cause this.
    
     * Compactors are all busy when new bulk imports arrive.
     * Many processes bulk import a few files to a single tablet at around
       the same time
     * A single process bulk imports a lot of files to a single tablet
    
    When a tablet has too many files it can eventually cause cascading
    problems for compaction and scan.  The change adds two properties to
    help avoid this problem.
    
    The first property `table.file.pause`.  The behavior of this
    property is to pause bulk imports, and eventually minor compactions,
    when a tablets current file counts exceeds this property.  The default
    is unlimited and therefore the default will never pause.
    
    The second property is `table.bulk.max.tablet.files`.  This property
    determines the maximum number of files a bulk import can add to a single
    tablet.  When this limit is exceeded the bulk import operation will fail
    w/o making changes to any tablets.
    
    Below is an example of how these properties behave.
    
     1. Set table.file.pause=30
     2. Set table.bulk.max.tablet.files=100
     3. Import 20 files into tablet A, this causes tablet A to have 20 files
     4. Import 20 files into tablet A, this causes tablet A to have 40 files
     5. Import 20 files into tablet A. Because the tablet currently has 40
        files and the pause limit is 30, this bulk import will pause.
     6. Tablet A compacts 10 files, this causes tablet A to have 31 files.
        It is still above the pause limit so the bulk import does not
        progress.
     7. Tablet A compacts 10 files, this causes tablet A to have 22 files.
     8. The paused bulk import proceeds, this causes tablet A to have 42
        files.
     9. Import 200 files into tablet B and one file in tablet A.  This
        operation fails w/o changing tablet A or B because 200 exceeds the
        value of table.bulk.max.tablet.files.
    
    While making this change ran into two preexisting problems.  One was
    with bulk import setting time.  For the case of multiple files the
    behavior of setting time was incorrect and inconsistent depending on the
    table time type and if the tablet was hosted or not.  Made the behavior
    consistent for hosted or unhosted and the two table time types. The
    behavior is that single time stamp is allocated for all files in all
    cases. The code used to allocate different number of timestamps in the
    four different cases.  This behavior was causing tablet refresh to fail
    and these changes to fail.  Fixed this existing issue since progress
    could not be made on these changes without fixing it. The new test in
    this PR that add lots of files to a single tablet and set request bulk
    import to set time uncovered the existing problem.
    
    The second problem was the existing code had handling for the case of a
    subset of files being added to a tablet by bulk import. This should
    never happen because files are added via a mutation. Expect either the
    entire mutation to go through or nothing.  Removed this handling for a
    subset and changed the code to throw an exception if a subset other than
    the empty set is seen. This change greatly simplified implementing this
    feature.
    
    fixes #5023
---
 .../org/apache/accumulo/core/conf/Property.java    |  44 ++--
 .../thrift/TabletServerClientService.java          | 132 ++----------
 core/src/main/thrift/tabletserver.thrift           |   1 -
 .../manager/tableOps/bulkVer2/LoadFiles.java       | 121 +++++++----
 .../manager/tableOps/bulkVer2/PrepBulkImport.java  |  22 +-
 .../manager/tableOps/bulkVer2/TabletRefresher.java |   1 -
 .../tableOps/bulkVer2/PrepBulkImportTest.java      |   2 +-
 .../accumulo/tserver/TabletClientHandler.java      |   5 +-
 .../org/apache/accumulo/tserver/tablet/Tablet.java |  19 +-
 .../apache/accumulo/test/functional/BulkNewIT.java | 231 ++++++++++++++++++++-
 .../accumulo/test/performance/NullTserver.java     |   2 +-
 11 files changed, 375 insertions(+), 205 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index b9b7f72181..f330249998 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -921,6 +921,15 @@ public enum Property {
       "The maximum amount of memory that will be used to cache results of a 
client query/scan. "
           + "Once this limit is reached, the buffered data is sent to the 
client.",
       "1.3.5"),
+  TABLE_BULK_MAX_TABLETS("table.bulk.max.tablets", "0", PropertyType.COUNT,
+      "The maximum number of tablets allowed for one bulk import file. Value 
of 0 is Unlimited. "
+          + "This property is only enforced in the new bulk import API.",
+      "2.1.0"),
+  TABLE_BULK_MAX_TABLET_FILES("table.bulk.max.tablet.files", "0", 
PropertyType.COUNT,
+      "The maximum number of files a bulk import can add to a single tablet.  
When this property "
+          + "is exceeded for any tablet the entire bulk import operation will 
fail before any making "
+          + "changes. Value of 0 is unlimited.",
+      "4.0.0"),
   TABLE_FILE_TYPE("table.file.type", RFile.EXTENSION, 
PropertyType.FILENAME_EXT,
       "Change the type of file a table writes.", "1.3.5"),
   TABLE_LOAD_BALANCER("table.balancer", 
"org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer",
@@ -947,17 +956,28 @@ public enum Property {
           + " defaults are used.",
       "1.3.5"),
   TABLE_FILE_MAX("table.file.max", "15", PropertyType.COUNT,
-      "The maximum number of RFiles each tablet in a table can have. When"
-          + " adjusting this property you may want to consider adjusting"
-          + " table.compaction.major.ratio also. Setting this property to 0 
will make"
-          + " it default to tserver.scan.files.open.max-1, this will prevent a 
tablet"
-          + " from having more RFiles than can be opened. Prior to 2.1.0 this 
property"
-          + " was used to trigger merging minor compactions, but merging minor 
compactions"
-          + " were removed in 2.1.0. Now this property is only used by the"
-          + " DefaultCompactionStrategy and the RatioBasedCompactionPlanner."
-          + " The RatioBasedCompactionPlanner started using this property in 
2.1.3, before"
-          + " that it did not use the property.",
+      "This property is used to signal to the compaction planner that it 
should be more "
+          + "aggressive for compacting tablets that exceed this limit. The "
+          + "RatioBasedCompactionPlanner will lower the compaction ratio and 
increase the "
+          + "priority for tablets that exceed this limit. When  adjusting this 
property you may "
+          + "want to consider adjusting table.compaction.major.ratio also. 
Setting this property "
+          + "to 0 will make it default to tserver.scan.files.open.max-1, this 
will prevent a tablet"
+          + " from having more RFiles than can be opened by a scan.",
       "1.4.0"),
+  TABLE_FILE_PAUSE("table.file.pause", "0", PropertyType.COUNT,
+      "When a tablet has more than this number of files, bulk imports and 
minor compactions "
+          + "will wait until the tablet has less files before proceeding.  
This will cause back "
+          + "pressure on bulk imports and writes to tables when compactions 
are not keeping up. "
+          + "Only the number of files a tablet currently has is considered for 
pausing, the "
+          + "number of files a bulk import will add is not considered. This 
means a bulk import "
+          + "can surge above this limit once causing future bulk imports or 
minor compactions to "
+          + "pause until compactions can catch up.  This property plus "
+          + TABLE_BULK_MAX_TABLET_FILES.getKey()
+          + " determines the total number of files a tablet could temporarily 
surge to based on bulk "
+          + "imports.  Ideally this property would be set higher than " + 
TABLE_FILE_MAX.getKey()
+          + " so that compactions are more aggressive prior to reaching the 
pause point. Value of 0 is "
+          + "unlimited.",
+      "4.0.0"),
   TABLE_MERGE_FILE_MAX("table.merge.file.max", "10000", PropertyType.COUNT,
       "The maximum number of files that a merge operation will process.  
Before "
           + "merging a sum of the number of files in the merge range is 
computed and if it "
@@ -994,10 +1014,6 @@ public enum Property {
       "1.3.5"),
   TABLE_BLOOM_HASHTYPE("table.bloom.hash.type", "murmur", PropertyType.STRING,
       "The bloom filter hash type.", "1.3.5"),
-  TABLE_BULK_MAX_TABLETS("table.bulk.max.tablets", "0", PropertyType.COUNT,
-      "The maximum number of tablets allowed for one bulk import file. Value 
of 0 is Unlimited. "
-          + "This property is only enforced in the new bulk import API.",
-      "2.1.0"),
   TABLE_DURABILITY("table.durability", "sync", PropertyType.DURABILITY,
       "The durability used to write to the write-ahead log. Legal values are:"
           + " none, which skips the write-ahead log; log, which sends the data 
to the"
diff --git 
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TabletServerClientService.java
 
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TabletServerClientService.java
index 9eb4e3ea7e..9c66164840 100644
--- 
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TabletServerClientService.java
+++ 
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TabletServerClientService.java
@@ -57,7 +57,7 @@ public class TabletServerClientService {
 
     public java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> 
refreshTablets(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> 
tabletsToRefresh) throws org.apache.thrift.TException;
 
-    public 
java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long>
 allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets, 
int numStamps) throws org.apache.thrift.TException;
+    public 
java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long>
 allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets) 
throws org.apache.thrift.TException;
 
   }
 
@@ -91,7 +91,7 @@ public class TabletServerClientService {
 
     public void 
refreshTablets(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> 
tabletsToRefresh, 
org.apache.thrift.async.AsyncMethodCallback<java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent>>
 resultHandler) throws org.apache.thrift.TException;
 
-    public void 
allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets, 
int numStamps, 
org.apache.thrift.async.AsyncMethodCallback<java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long>>
 resultHandler) throws org.apache.thrift.TException;
+    public void 
allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets, 
org.apache.thrift.async.AsyncMethodCallback<java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long>>
 resultHandler) throws org.apache.thrift.TException;
 
   }
 
@@ -477,19 +477,18 @@ public class TabletServerClientService {
     }
 
     @Override
-    public 
java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long>
 allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets, 
int numStamps) throws org.apache.thrift.TException
+    public 
java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long>
 allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets) 
throws org.apache.thrift.TException
     {
-      send_allocateTimestamps(tinfo, credentials, tablets, numStamps);
+      send_allocateTimestamps(tinfo, credentials, tablets);
       return recv_allocateTimestamps();
     }
 
-    public void 
send_allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets, 
int numStamps) throws org.apache.thrift.TException
+    public void 
send_allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets) 
throws org.apache.thrift.TException
     {
       allocateTimestamps_args args = new allocateTimestamps_args();
       args.setTinfo(tinfo);
       args.setCredentials(credentials);
       args.setTablets(tablets);
-      args.setNumStamps(numStamps);
       sendBase("allocateTimestamps", args);
     }
 
@@ -1101,9 +1100,9 @@ public class TabletServerClientService {
     }
 
     @Override
-    public void 
allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets, 
int numStamps, 
org.apache.thrift.async.AsyncMethodCallback<java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long>>
 resultHandler) throws org.apache.thrift.TException {
+    public void 
allocateTimestamps(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets, 
org.apache.thrift.async.AsyncMethodCallback<java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long>>
 resultHandler) throws org.apache.thrift.TException {
       checkReady();
-      allocateTimestamps_call method_call = new allocateTimestamps_call(tinfo, 
credentials, tablets, numStamps, resultHandler, this, ___protocolFactory, 
___transport);
+      allocateTimestamps_call method_call = new allocateTimestamps_call(tinfo, 
credentials, tablets, resultHandler, this, ___protocolFactory, ___transport);
       this.___currentMethod = method_call;
       ___manager.call(method_call);
     }
@@ -1112,13 +1111,11 @@ public class TabletServerClientService {
       private org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo;
       private org.apache.accumulo.core.securityImpl.thrift.TCredentials 
credentials;
       private 
java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets;
-      private int numStamps;
-      public 
allocateTimestamps_call(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets, 
int numStamps, 
org.apache.thrift.async.AsyncMethodCallback<java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long>>
 resultHandler, org.apache.thrift.async.TAsyncClient client, 
org.apache.thrift.protocol.TProtocolFactory proto [...]
+      public 
allocateTimestamps_call(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, 
java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets, 
org.apache.thrift.async.AsyncMethodCallback<java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long>>
 resultHandler, org.apache.thrift.async.TAsyncClient client, 
org.apache.thrift.protocol.TProtocolFactory protocolFactory, org [...]
         super(client, protocolFactory, transport, resultHandler, false);
         this.tinfo = tinfo;
         this.credentials = credentials;
         this.tablets = tablets;
-        this.numStamps = numStamps;
       }
 
       @Override
@@ -1128,7 +1125,6 @@ public class TabletServerClientService {
         args.setTinfo(tinfo);
         args.setCredentials(credentials);
         args.setTablets(tablets);
-        args.setNumStamps(numStamps);
         args.write(prot);
         prot.writeMessageEnd();
       }
@@ -1625,7 +1621,7 @@ public class TabletServerClientService {
       @Override
       public allocateTimestamps_result getResult(I iface, 
allocateTimestamps_args args) throws org.apache.thrift.TException {
         allocateTimestamps_result result = new allocateTimestamps_result();
-        result.success = iface.allocateTimestamps(args.tinfo, 
args.credentials, args.tablets, args.numStamps);
+        result.success = iface.allocateTimestamps(args.tinfo, 
args.credentials, args.tablets);
         return result;
       }
     }
@@ -2620,7 +2616,7 @@ public class TabletServerClientService {
 
       @Override
       public void start(I iface, allocateTimestamps_args args, 
org.apache.thrift.async.AsyncMethodCallback<java.util.Map<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent,java.lang.Long>>
 resultHandler) throws org.apache.thrift.TException {
-        iface.allocateTimestamps(args.tinfo, args.credentials, args.tablets, 
args.numStamps,resultHandler);
+        iface.allocateTimestamps(args.tinfo, args.credentials, 
args.tablets,resultHandler);
       }
     }
 
@@ -16715,7 +16711,6 @@ public class TabletServerClientService {
     private static final org.apache.thrift.protocol.TField TINFO_FIELD_DESC = 
new org.apache.thrift.protocol.TField("tinfo", 
org.apache.thrift.protocol.TType.STRUCT, (short)1);
     private static final org.apache.thrift.protocol.TField 
CREDENTIALS_FIELD_DESC = new org.apache.thrift.protocol.TField("credentials", 
org.apache.thrift.protocol.TType.STRUCT, (short)2);
     private static final org.apache.thrift.protocol.TField TABLETS_FIELD_DESC 
= new org.apache.thrift.protocol.TField("tablets", 
org.apache.thrift.protocol.TType.LIST, (short)3);
-    private static final org.apache.thrift.protocol.TField 
NUM_STAMPS_FIELD_DESC = new org.apache.thrift.protocol.TField("numStamps", 
org.apache.thrift.protocol.TType.I32, (short)4);
 
     private static final org.apache.thrift.scheme.SchemeFactory 
STANDARD_SCHEME_FACTORY = new allocateTimestamps_argsStandardSchemeFactory();
     private static final org.apache.thrift.scheme.SchemeFactory 
TUPLE_SCHEME_FACTORY = new allocateTimestamps_argsTupleSchemeFactory();
@@ -16723,14 +16718,12 @@ public class TabletServerClientService {
     public @org.apache.thrift.annotation.Nullable 
org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo; // required
     public @org.apache.thrift.annotation.Nullable 
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials; // 
required
     public @org.apache.thrift.annotation.Nullable 
java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> tablets; // 
required
-    public int numStamps; // required
 
     /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
     public enum _Fields implements org.apache.thrift.TFieldIdEnum {
       TINFO((short)1, "tinfo"),
       CREDENTIALS((short)2, "credentials"),
-      TABLETS((short)3, "tablets"),
-      NUM_STAMPS((short)4, "numStamps");
+      TABLETS((short)3, "tablets");
 
       private static final java.util.Map<java.lang.String, _Fields> byName = 
new java.util.HashMap<java.lang.String, _Fields>();
 
@@ -16752,8 +16745,6 @@ public class TabletServerClientService {
             return CREDENTIALS;
           case 3: // TABLETS
             return TABLETS;
-          case 4: // NUM_STAMPS
-            return NUM_STAMPS;
           default:
             return null;
         }
@@ -16797,8 +16788,6 @@ public class TabletServerClientService {
     }
 
     // isset id assignments
-    private static final int __NUMSTAMPS_ISSET_ID = 0;
-    private byte __isset_bitfield = 0;
     public static final java.util.Map<_Fields, 
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
     static {
       java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap 
= new java.util.EnumMap<_Fields, 
org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -16809,8 +16798,6 @@ public class TabletServerClientService {
       tmpMap.put(_Fields.TABLETS, new 
org.apache.thrift.meta_data.FieldMetaData("tablets", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
           new 
org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, 
               new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 org.apache.accumulo.core.dataImpl.thrift.TKeyExtent.class))));
-      tmpMap.put(_Fields.NUM_STAMPS, new 
org.apache.thrift.meta_data.FieldMetaData("numStamps", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
-          new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
       metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
       
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(allocateTimestamps_args.class,
 metaDataMap);
     }
@@ -16821,22 +16808,18 @@ public class TabletServerClientService {
     public allocateTimestamps_args(
       org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
       org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
-      java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> 
tablets,
-      int numStamps)
+      java.util.List<org.apache.accumulo.core.dataImpl.thrift.TKeyExtent> 
tablets)
     {
       this();
       this.tinfo = tinfo;
       this.credentials = credentials;
       this.tablets = tablets;
-      this.numStamps = numStamps;
-      setNumStampsIsSet(true);
     }
 
     /**
      * Performs a deep copy on <i>other</i>.
      */
     public allocateTimestamps_args(allocateTimestamps_args other) {
-      __isset_bitfield = other.__isset_bitfield;
       if (other.isSetTinfo()) {
         this.tinfo = new 
org.apache.accumulo.core.clientImpl.thrift.TInfo(other.tinfo);
       }
@@ -16850,7 +16833,6 @@ public class TabletServerClientService {
         }
         this.tablets = __this__tablets;
       }
-      this.numStamps = other.numStamps;
     }
 
     @Override
@@ -16863,8 +16845,6 @@ public class TabletServerClientService {
       this.tinfo = null;
       this.credentials = null;
       this.tablets = null;
-      setNumStampsIsSet(false);
-      this.numStamps = 0;
     }
 
     @org.apache.thrift.annotation.Nullable
@@ -16958,29 +16938,6 @@ public class TabletServerClientService {
       }
     }
 
-    public int getNumStamps() {
-      return this.numStamps;
-    }
-
-    public allocateTimestamps_args setNumStamps(int numStamps) {
-      this.numStamps = numStamps;
-      setNumStampsIsSet(true);
-      return this;
-    }
-
-    public void unsetNumStamps() {
-      __isset_bitfield = 
org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, 
__NUMSTAMPS_ISSET_ID);
-    }
-
-    /** Returns true if field numStamps is set (has been assigned a value) and 
false otherwise */
-    public boolean isSetNumStamps() {
-      return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, 
__NUMSTAMPS_ISSET_ID);
-    }
-
-    public void setNumStampsIsSet(boolean value) {
-      __isset_bitfield = 
org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __NUMSTAMPS_ISSET_ID, 
value);
-    }
-
     @Override
     public void setFieldValue(_Fields field, 
@org.apache.thrift.annotation.Nullable java.lang.Object value) {
       switch (field) {
@@ -17008,14 +16965,6 @@ public class TabletServerClientService {
         }
         break;
 
-      case NUM_STAMPS:
-        if (value == null) {
-          unsetNumStamps();
-        } else {
-          setNumStamps((java.lang.Integer)value);
-        }
-        break;
-
       }
     }
 
@@ -17032,9 +16981,6 @@ public class TabletServerClientService {
       case TABLETS:
         return getTablets();
 
-      case NUM_STAMPS:
-        return getNumStamps();
-
       }
       throw new java.lang.IllegalStateException();
     }
@@ -17053,8 +16999,6 @@ public class TabletServerClientService {
         return isSetCredentials();
       case TABLETS:
         return isSetTablets();
-      case NUM_STAMPS:
-        return isSetNumStamps();
       }
       throw new java.lang.IllegalStateException();
     }
@@ -17099,15 +17043,6 @@ public class TabletServerClientService {
           return false;
       }
 
-      boolean this_present_numStamps = true;
-      boolean that_present_numStamps = true;
-      if (this_present_numStamps || that_present_numStamps) {
-        if (!(this_present_numStamps && that_present_numStamps))
-          return false;
-        if (this.numStamps != that.numStamps)
-          return false;
-      }
-
       return true;
     }
 
@@ -17127,8 +17062,6 @@ public class TabletServerClientService {
       if (isSetTablets())
         hashCode = hashCode * 8191 + tablets.hashCode();
 
-      hashCode = hashCode * 8191 + numStamps;
-
       return hashCode;
     }
 
@@ -17170,16 +17103,6 @@ public class TabletServerClientService {
           return lastComparison;
         }
       }
-      lastComparison = java.lang.Boolean.compare(isSetNumStamps(), 
other.isSetNumStamps());
-      if (lastComparison != 0) {
-        return lastComparison;
-      }
-      if (isSetNumStamps()) {
-        lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(this.numStamps, other.numStamps);
-        if (lastComparison != 0) {
-          return lastComparison;
-        }
-      }
       return 0;
     }
 
@@ -17227,10 +17150,6 @@ public class TabletServerClientService {
         sb.append(this.tablets);
       }
       first = false;
-      if (!first) sb.append(", ");
-      sb.append("numStamps:");
-      sb.append(this.numStamps);
-      first = false;
       sb.append(")");
       return sb.toString();
     }
@@ -17256,8 +17175,6 @@ public class TabletServerClientService {
 
     private void readObject(java.io.ObjectInputStream in) throws 
java.io.IOException, java.lang.ClassNotFoundException {
       try {
-        // it doesn't seem like you should have to do this, but java 
serialization is wacky, and doesn't call the default constructor.
-        __isset_bitfield = 0;
         read(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);
@@ -17321,14 +17238,6 @@ public class TabletServerClientService {
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
               }
               break;
-            case 4: // NUM_STAMPS
-              if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
-                struct.numStamps = iprot.readI32();
-                struct.setNumStampsIsSet(true);
-              } else { 
-                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
-              }
-              break;
             default:
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
           }
@@ -17367,9 +17276,6 @@ public class TabletServerClientService {
           }
           oprot.writeFieldEnd();
         }
-        oprot.writeFieldBegin(NUM_STAMPS_FIELD_DESC);
-        oprot.writeI32(struct.numStamps);
-        oprot.writeFieldEnd();
         oprot.writeFieldStop();
         oprot.writeStructEnd();
       }
@@ -17398,10 +17304,7 @@ public class TabletServerClientService {
         if (struct.isSetTablets()) {
           optionals.set(2);
         }
-        if (struct.isSetNumStamps()) {
-          optionals.set(3);
-        }
-        oprot.writeBitSet(optionals, 4);
+        oprot.writeBitSet(optionals, 3);
         if (struct.isSetTinfo()) {
           struct.tinfo.write(oprot);
         }
@@ -17417,15 +17320,12 @@ public class TabletServerClientService {
             }
           }
         }
-        if (struct.isSetNumStamps()) {
-          oprot.writeI32(struct.numStamps);
-        }
       }
 
       @Override
       public void read(org.apache.thrift.protocol.TProtocol prot, 
allocateTimestamps_args struct) throws org.apache.thrift.TException {
         org.apache.thrift.protocol.TTupleProtocol iprot = 
(org.apache.thrift.protocol.TTupleProtocol) prot;
-        java.util.BitSet incoming = iprot.readBitSet(4);
+        java.util.BitSet incoming = iprot.readBitSet(3);
         if (incoming.get(0)) {
           struct.tinfo = new 
org.apache.accumulo.core.clientImpl.thrift.TInfo();
           struct.tinfo.read(iprot);
@@ -17450,10 +17350,6 @@ public class TabletServerClientService {
           }
           struct.setTabletsIsSet(true);
         }
-        if (incoming.get(3)) {
-          struct.numStamps = iprot.readI32();
-          struct.setNumStampsIsSet(true);
-        }
       }
     }
 
diff --git a/core/src/main/thrift/tabletserver.thrift 
b/core/src/main/thrift/tabletserver.thrift
index bcf85d32be..1069445ac7 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -243,7 +243,6 @@ service TabletServerClientService {
     1:client.TInfo tinfo
     2:security.TCredentials credentials
     3:list<data.TKeyExtent> tablets
-    4:i32 numStamps
   )
 }
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index 711387955d..62eec1ebe1 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.manager.tableOps.bulkVer2;
 
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
@@ -25,6 +26,7 @@ import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType.CURRENT;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -54,6 +56,7 @@ import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
@@ -119,8 +122,10 @@ class LoadFiles extends ManagerRepo {
     private Map<KeyExtent,List<TabletFile>> loadingFiles;
 
     private long skipped = 0;
+    private long pauseLimit;
 
-    void start(Path bulkDir, Manager manager, FateId fateId, boolean setTime) 
throws Exception {
+    void start(Path bulkDir, Manager manager, TableId tableId, FateId fateId, 
boolean setTime)
+        throws Exception {
       this.bulkDir = bulkDir;
       this.manager = manager;
       this.fateId = fateId;
@@ -128,6 +133,8 @@ class LoadFiles extends ManagerRepo {
       conditionalMutator = 
manager.getContext().getAmple().conditionallyMutateTablets();
       this.skipped = 0;
       this.loadingFiles = new HashMap<>();
+      this.pauseLimit =
+          
manager.getContext().getTableConfiguration(tableId).getCount(Property.TABLE_FILE_PAUSE);
     }
 
     void load(List<TabletMetadata> tablets, Files files) {
@@ -141,7 +148,18 @@ class LoadFiles extends ManagerRepo {
       tablets = tablets.stream().filter(tabletMeta -> {
         Set<ReferencedTabletFile> loaded = 
tabletMeta.getLoaded().keySet().stream()
             .map(StoredTabletFile::getTabletFile).collect(Collectors.toSet());
-        return !loaded.containsAll(toLoad.keySet());
+        boolean containsAll = loaded.containsAll(toLoad.keySet());
+        // The tablet should either contain all loaded files or none. It 
should never contain a
+        // subset. Loaded files are written in single mutation to accumulo, 
either all changes in a
+        // mutation should go through or none.
+        Preconditions.checkState(containsAll || Collections.disjoint(loaded, 
toLoad.keySet()),
+            "Tablet %s has a subset of loaded files %s %s", 
tabletMeta.getExtent(), loaded,
+            toLoad.keySet());
+        if (containsAll) {
+          log.trace("{} tablet {} has already loaded all files, nothing to 
do", fateId,
+              tabletMeta.getExtent());
+        }
+        return !containsAll;
       }).collect(Collectors.toList());
 
       // timestamps from tablets that are hosted on a tablet server
@@ -155,7 +173,19 @@ class LoadFiles extends ManagerRepo {
         hostedTimestamps = Map.of();
       }
 
+      List<ColumnType> rsc = new ArrayList<>();
+      rsc.add(LOCATION);
+      if (setTime) {
+        rsc.add(TIME);
+      }
+      if (pauseLimit > 0) {
+        rsc.add(FILES);
+      }
+
+      ColumnType[] requireSameCols = rsc.toArray(new ColumnType[0]);
+
       for (TabletMetadata tablet : tablets) {
+        // Skip any tablets at the beginning of the loop before any work is 
done.
         if (setTime && tablet.getLocation() != null
             && !hostedTimestamps.containsKey(tablet.getExtent())) {
           skipped++;
@@ -163,12 +193,27 @@ class LoadFiles extends ManagerRepo {
               tablet.getExtent());
           continue;
         }
+        if (pauseLimit > 0 && tablet.getFiles().size() > pauseLimit) {
+          skipped++;
+          log.debug(
+              "{} tablet {} has {} files which exceeds the pause limit of {}, 
not bulk importing and will retry later",
+              fateId, tablet.getExtent(), tablet.getFiles().size(), 
pauseLimit);
+          continue;
+        }
 
         Map<ReferencedTabletFile,DataFileValue> filesToLoad = new HashMap<>();
 
         var tabletTime = TabletTime.getInstance(tablet.getTime());
 
-        int timeOffset = 0;
+        Long fileTime = null;
+        if (setTime) {
+          if (tablet.getLocation() == null) {
+            fileTime = tabletTime.getAndUpdateTime();
+          } else {
+            fileTime = hostedTimestamps.get(tablet.getExtent());
+            tabletTime.updateTimeIfGreater(fileTime);
+          }
+        }
 
         for (var entry : toLoad.entrySet()) {
           ReferencedTabletFile refTabFile = entry.getKey();
@@ -177,55 +222,35 @@ class LoadFiles extends ManagerRepo {
           DataFileValue dfv;
 
           if (setTime) {
-            if (tablet.getLocation() == null) {
-              dfv = new DataFileValue(fileInfo.getEstFileSize(), 
fileInfo.getEstNumEntries(),
-                  tabletTime.getAndUpdateTime());
-            } else {
-              long fileTime = hostedTimestamps.get(tablet.getExtent()) + 
timeOffset;
-              dfv = new DataFileValue(fileInfo.getEstFileSize(), 
fileInfo.getEstNumEntries(),
-                  fileTime);
-              tabletTime.updateTimeIfGreater(fileTime);
-              timeOffset++;
-            }
+            // This should always be set outside the loop when setTime is true 
and should not be
+            // null at this point
+            Preconditions.checkState(fileTime != null);
+            dfv =
+                new DataFileValue(fileInfo.getEstFileSize(), 
fileInfo.getEstNumEntries(), fileTime);
           } else {
             dfv = new DataFileValue(fileInfo.getEstFileSize(), 
fileInfo.getEstNumEntries());
           }
 
           filesToLoad.put(refTabFile, dfv);
-
         }
 
-        // remove any files that were already loaded
-        tablet.getLoaded().keySet().forEach(stf -> {
-          filesToLoad.keySet().remove(stf.getTabletFile());
-        });
+        var tabletMutator = conditionalMutator.mutateTablet(tablet.getExtent())
+            .requireAbsentOperation().requireSame(tablet, LOADED, 
requireSameCols);
 
-        if (!filesToLoad.isEmpty()) {
-          var tabletMutator =
-              
conditionalMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation();
-
-          if (setTime) {
-            tabletMutator.requireSame(tablet, LOADED, TIME, LOCATION);
-          } else {
-            tabletMutator.requireSame(tablet, LOADED, LOCATION);
-          }
-
-          filesToLoad.forEach((f, v) -> {
-            tabletMutator.putBulkFile(f, fateId);
-            tabletMutator.putFile(f, v);
-          });
+        filesToLoad.forEach((f, v) -> {
+          tabletMutator.putBulkFile(f, fateId);
+          tabletMutator.putFile(f, v);
+        });
 
-          if (setTime) {
-            tabletMutator.putTime(tabletTime.getMetadataTime());
-          }
+        if (setTime) {
+          tabletMutator.putTime(tabletTime.getMetadataTime());
+        }
 
-          // Hang on to for logging purposes in the case where the update is a
-          // success.
-          Preconditions.checkState(
-              loadingFiles.put(tablet.getExtent(), 
List.copyOf(filesToLoad.keySet())) == null);
+        // Hang on to loaded files for logging purposes in the case where the 
update is success.
+        Preconditions.checkState(
+            loadingFiles.put(tablet.getExtent(), 
List.copyOf(filesToLoad.keySet())) == null);
 
-          tabletMutator.submit(tm -> false);
-        }
+        tabletMutator.submit(tm -> false);
       }
     }
 
@@ -268,8 +293,8 @@ class LoadFiles extends ManagerRepo {
         client =
             ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server, 
context, timeInMillis);
 
-        var timestamps = client.allocateTimestamps(TraceUtil.traceInfo(), 
context.rpcCreds(),
-            extents, numStamps);
+        var timestamps =
+            client.allocateTimestamps(TraceUtil.traceInfo(), 
context.rpcCreds(), extents);
 
         log.trace("{} allocate timestamps request to {} returned {} 
timestamps", fateId, server,
             timestamps.size());
@@ -333,10 +358,16 @@ class LoadFiles extends ManagerRepo {
 
     Loader loader = new Loader();
     long t1;
-    loader.start(bulkDir, manager, fateId, bulkInfo.setTime);
+    loader.start(bulkDir, manager, tableId, fateId, bulkInfo.setTime);
+
+    List<ColumnType> fetchCols = new ArrayList<>(List.of(PREV_ROW, LOCATION, 
LOADED, TIME));
+    if (loader.pauseLimit > 0) {
+      fetchCols.add(FILES);
+    }
+
     try (TabletsMetadata tabletsMetadata =
         
TabletsMetadata.builder(manager.getContext()).forTable(tableId).overlapping(startRow,
 null)
-            .checkConsistency().fetch(PREV_ROW, LOCATION, LOADED, 
TIME).build()) {
+            .checkConsistency().fetch(fetchCols.toArray(new 
ColumnType[0])).build()) {
 
       // The tablet iterator and load mapping iterator are both iterating over 
data that is sorted
       // in the same way. The two iterators are each independently advanced to 
find common points in
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
index c2ae6f9cd7..b2d3521735 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
@@ -35,6 +35,7 @@ import java.util.function.Function;
 
 import org.apache.accumulo.core.Constants;
 import 
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
+import org.apache.accumulo.core.clientImpl.bulk.Bulk;
 import org.apache.accumulo.core.clientImpl.bulk.BulkImport;
 import org.apache.accumulo.core.clientImpl.bulk.BulkSerialize;
 import org.apache.accumulo.core.clientImpl.bulk.LoadMappingIterator;
@@ -116,8 +117,10 @@ public class PrepBulkImport extends ManagerRepo {
    */
   @VisibleForTesting
   static KeyExtent validateLoadMapping(String tableId, LoadMappingIterator lmi,
-      TabletIterFactory tabletIterFactory, int maxNumTablets) throws Exception 
{
+      TabletIterFactory tabletIterFactory, int maxNumTablets, int 
maxFilesPerTablet)
+      throws Exception {
     var currRange = lmi.next();
+    checkFilesPerTablet(tableId, maxFilesPerTablet, currRange);
 
     Text startRow = currRange.getKey().prevEndRow();
 
@@ -143,6 +146,7 @@ public class PrepBulkImport extends ManagerRepo {
           break;
         }
         currRange = lmi.next();
+        checkFilesPerTablet(tableId, maxFilesPerTablet, currRange);
         lastTablet = currRange.getKey();
       }
 
@@ -194,6 +198,17 @@ public class PrepBulkImport extends ManagerRepo {
     return new KeyExtent(firstTablet.tableId(), lastTablet.endRow(), 
firstTablet.prevEndRow());
   }
 
+  private static void checkFilesPerTablet(String tableId, int 
maxFilesPerTablet,
+      Map.Entry<KeyExtent,Bulk.Files> currRange) throws 
AcceptableThriftTableOperationException {
+    if (maxFilesPerTablet > 0 && currRange.getValue().getSize() > 
maxFilesPerTablet) {
+      throw new AcceptableThriftTableOperationException(tableId, null, 
TableOperation.BULK_IMPORT,
+          TableOperationExceptionType.OTHER,
+          "Attempted to import " + currRange.getValue().getSize()
+              + " files into a single tablet which exceeds the configured max 
of "
+              + maxFilesPerTablet);
+    }
+  }
+
   private static class TabletIterFactoryImpl implements TabletIterFactory {
     private final List<AutoCloseable> resourcesToClose = new ArrayList<>();
     private final Manager manager;
@@ -228,12 +243,15 @@ public class PrepBulkImport extends ManagerRepo {
 
     int maxTablets = 
manager.getContext().getTableConfiguration(bulkInfo.tableId)
         .getCount(Property.TABLE_BULK_MAX_TABLETS);
+    int maxFilesPerTablet = 
manager.getContext().getTableConfiguration(bulkInfo.tableId)
+        .getCount(Property.TABLE_BULK_MAX_TABLET_FILES);
 
     try (
         LoadMappingIterator lmi =
             BulkSerialize.readLoadMapping(bulkDir.toString(), 
bulkInfo.tableId, fs::open);
         TabletIterFactory tabletIterFactory = new 
TabletIterFactoryImpl(manager, bulkInfo)) {
-      return validateLoadMapping(bulkInfo.tableId.canonical(), lmi, 
tabletIterFactory, maxTablets);
+      return validateLoadMapping(bulkInfo.tableId.canonical(), lmi, 
tabletIterFactory, maxTablets,
+          maxFilesPerTablet);
     }
   }
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
index 8bff563391..339857ec20 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
@@ -72,7 +72,6 @@ public class TabletRefresher {
       // request. There may also be tablets that had a location when the files 
were set but do not
       // have a location now, that is ok the next time that tablet loads 
somewhere it will see the
       // files.
-
       var tabletIterator =
           tablets.stream().filter(tabletMetadata -> 
tabletMetadata.getLocation() != null)
               .filter(needsRefresh).iterator();
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java
index 6c34c6aad3..1872123797 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java
@@ -129,7 +129,7 @@ public class PrepBulkImportTest {
         .map(Text::toString).orElse(null);
 
     try (LoadMappingIterator lmi = createLoadMappingIter(loadRanges)) {
-      var extent = PrepBulkImport.validateLoadMapping("1", lmi, 
tabletIterFactory, maxTablets);
+      var extent = PrepBulkImport.validateLoadMapping("1", lmi, 
tabletIterFactory, maxTablets, 0);
       assertEquals(nke(minPrevEndRow, maxPrevEndRow), extent, loadRanges + " " 
+ tabletRanges);
     }
   }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index a815af72c3..0b5c246647 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@ -1181,7 +1181,7 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
 
   @Override
   public Map<TKeyExtent,Long> allocateTimestamps(TInfo tinfo, TCredentials 
credentials,
-      List<TKeyExtent> extents, int numStamps) throws TException {
+      List<TKeyExtent> extents) throws TException {
     if (!security.canPerformSystemActions(credentials)) {
       throw new AccumuloSecurityException(credentials.getPrincipal(),
           SecurityErrorCode.PERMISSION_DENIED).asThriftException();
@@ -1195,8 +1195,7 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
       var extent = KeyExtent.fromThrift(textent);
       Tablet tablet = tabletsSnapshot.get(extent);
       if (tablet != null) {
-        tablet.allocateTimestamp(numStamps)
-            .ifPresent(timestamp -> timestamps.put(textent, timestamp));
+        tablet.allocateTimestamp().ifPresent(timestamp -> 
timestamps.put(textent, timestamp));
       }
     }
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 23e1e790f2..a022910cd7 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1689,8 +1689,8 @@ public class Tablet extends TabletBase {
         // Its expected that what is persisted should be less than equal to 
the time that tablet has
         // in memory.
         Preconditions.checkState(tabletMetadata.getTime().getTime() <= 
tabletTime.getTime(),
-            "Time in metadata is ahead of tablet %s memory:%s metadata:%s", 
extent, tabletTime,
-            tabletMetadata.getTime());
+            "Time in metadata is ahead of tablet %s memory:%s metadata:%s", 
extent,
+            tabletTime.getTime(), tabletMetadata.getTime());
 
         // must update latestMetadata before computeNumEntries() is called
         Preconditions.checkState(
@@ -1766,19 +1766,12 @@ public class Tablet extends TabletBase {
     return !activeScans.isEmpty() || writesInProgress > 0;
   }
 
-  public synchronized OptionalLong allocateTimestamp(int numStamps) {
+  public synchronized OptionalLong allocateTimestamp() {
     if (isClosing() || isClosed()) {
       return OptionalLong.empty();
     }
-
-    Preconditions.checkArgument(numStamps > 0);
-    long timestamp = Long.MIN_VALUE;
-    for (int i = 0; i < numStamps; i++) {
-      timestamp = tabletTime.getAndUpdateTime();
-    }
-
-    getTabletMemory().getCommitSession().updateMaxCommittedTime(timestamp);
-
-    return OptionalLong.of(timestamp);
+    var time = tabletTime.getAndUpdateTime();
+    getTabletMemory().getCommitSession().updateMaxCommittedTime(time);
+    return OptionalLong.of(time);
   }
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
index 1a3439919a..d2fa75ff26 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
@@ -23,6 +23,7 @@ import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -70,6 +71,7 @@ import 
org.apache.accumulo.core.client.admin.TabletAvailability;
 import org.apache.accumulo.core.client.admin.TabletInformation;
 import org.apache.accumulo.core.client.admin.TimeType;
 import org.apache.accumulo.core.clientImpl.ClientContext;
+import 
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
@@ -94,6 +96,7 @@ import 
org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory;
+import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.minicluster.MemoryUnit;
@@ -116,6 +119,7 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import com.google.common.collect.MoreCollectors;
 import com.google.common.util.concurrent.MoreExecutors;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -283,7 +287,7 @@ public class BulkNewIT extends SharedMiniClusterBase {
       writeData(dir + "/f1.", aconf, 0, 332);
 
       // For this import tablet should be hosted so the bulk import operation 
will have to
-      // coordinate getting time with the hosted tablet. The time should 
refect the batch writes
+      // coordinate getting time with the hosted tablet. The time should 
reflect the batch writes
       // just done.
       
client.tableOperations().importDirectory(dir).to(tableName).tableTime(true).load();
 
@@ -348,6 +352,187 @@ public class BulkNewIT extends SharedMiniClusterBase {
     }
   }
 
+  @Test
+  public void testPause() throws Exception {
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      tableName = "testPause_table1";
+      NewTableConfiguration newTableConf = new NewTableConfiguration();
+      var props =
+          Map.of(Property.TABLE_FILE_PAUSE.getKey(), "5", 
Property.TABLE_MAJC_RATIO.getKey(), "20");
+      newTableConf.setProperties(props);
+      client.tableOperations().create(tableName, newTableConf);
+
+      addSplits(client, tableName, "0060 0120");
+      String dir = getDir("/testPause1-");
+
+      for (int i = 0; i < 18; i++) {
+        writeData(dir + "/f" + i + ".", aconf, i * 10, (i + 1) * 10 - 1);
+      }
+
+      
client.tableOperations().importDirectory(dir).to(tableName).tableTime(true).load();
+      verifyData(client, tableName, 0, 179, false);
+
+      String dir2 = getDir("/testPause2-");
+
+      for (int i = 0; i < 18; i++) {
+        writeData(dir2 + "/f" + i + ".", aconf, i * 10, (i + 1) * 10 - 1, 
1000);
+      }
+
+      // Start a second bulk import in background thread because it is 
expected this bulk import
+      // will hang because tablets are over the pause file limit.
+      ExecutorService executor = Executors.newFixedThreadPool(1);
+      var future = executor.submit(() -> {
+        
client.tableOperations().importDirectory(dir2).to(tableName).tableTime(true).load();
+        return null;
+      });
+
+      // sleep a bit to give the bulk import a chance to run
+      UtilWaitThread.sleep(3000);
+      // bulk import should not have gone through it should be pausing because 
the tablet have too
+      // many files
+      assertFalse(future.isDone());
+      verifyData(client, tableName, 0, 179, false);
+
+      // Before the bulk import runs no tablets should have loaded flags set
+      assertEquals(Map.of("0060", 0, "0120", 0, "null", 0), 
countLoaded(client, tableName));
+      // compacting the first tablet should allow the import on that tablet to 
proceed
+      client.tableOperations().compact(tableName,
+          new CompactionConfig().setWait(true).setEndRow(new Text("0060")));
+      // Wait for the first tablets data to be updated by bulk import.
+      Wait.waitFor(
+          () -> Map.of("0060", 7, "0120", 0, "null", 
0).equals(countLoaded(client, tableName)));
+
+      // The bulk imports on the other tablets should not have gone through, 
verify their data was
+      // not updated. Spot check a few rows in the other two tablets. The 
first tablet may or may
+      // not be updated on the tablet server at this point, so can not look at 
its data.
+      assertEquals(61L, readRowValue(client, tableName, 61));
+      assertEquals(100L, readRowValue(client, tableName, 100));
+      assertEquals(140L, readRowValue(client, tableName, 140));
+
+      // compact the entire table, should allow all bulk imports to go through
+      client.tableOperations().compact(tableName, new 
CompactionConfig().setWait(true));
+      // wait for bulk import to complete
+      future.get();
+      // verify the values were updated by the bulk import that was paused
+      verifyData(client, tableName, 0, 179, 1000, false);
+      assertEquals(Map.of("0060", 0, "0120", 0, "null", 0), 
countLoaded(client, tableName));
+    }
+  }
+
+  @Test
+  public void testMaxTabletsPerFile() throws Exception {
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      tableName = "testMaxTabletsPerFile_table1";
+      NewTableConfiguration newTableConf = new NewTableConfiguration();
+      var props = Map.of(Property.TABLE_BULK_MAX_TABLET_FILES.getKey(), "5");
+      newTableConf.setProperties(props);
+      client.tableOperations().create(tableName, newTableConf);
+
+      String dir = getDir("/testBulkFileMFP-");
+
+      for (int i = 4; i < 8; i++) {
+        writeData(dir + "/f" + i + ".", aconf, i * 10, (i + 1) * 10 - 1);
+      }
+
+      // should be able to bulk import 4 files w/o issue
+      client.tableOperations().importDirectory(dir).to(tableName).load();
+
+      verifyData(client, tableName, 40, 79, false);
+
+      var dir2 = getDir("/testBulkFileMFP2-");
+      for (int i = 12; i < 18; i++) {
+        writeData(dir2 + "/f" + i + ".", aconf, i * 10, (i + 1) * 10 - 1);
+      }
+
+      var exception = assertThrows(AccumuloException.class,
+          () -> 
client.tableOperations().importDirectory(dir2).to(tableName).load());
+      var msg = ((ThriftTableOperationException) 
exception.getCause()).getDescription();
+      // message should contain the limit of 5 and the number of files 
attempted to import 6
+      assertTrue(msg.contains(" 5"), msg);
+      assertTrue(msg.contains(" 6"), msg);
+
+      // ensure no data was added to table
+      verifyData(client, tableName, 40, 79, false);
+
+      // tested a table w/ single tablet, now test a table w/ three tablets 
and try importing into
+      // the first, middle, and last tablet
+      addSplits(client, tableName, "0100 0200");
+
+      // try the first tablet
+      var dir3 = getDir("/testBulkFileMFP3-");
+      for (int i = 0; i < 7; i++) {
+        writeData(dir3 + "/f" + i + ".", aconf, i * 10, (i + 1) * 10 - 1);
+      }
+      // add single file for the last tablet, this does not exceed the limit 
however it should not
+      // go through
+      writeData(dir3 + "/f_last.", aconf, 300, 400);
+      exception = assertThrows(AccumuloException.class,
+          () -> 
client.tableOperations().importDirectory(dir3).to(tableName).load());
+      // verify no files were moved by the failed bulk import
+      assertEquals(8, Arrays.stream(
+          getCluster().getFileSystem().listStatus(new Path(dir3), f -> 
f.getName().endsWith(".rf")))
+          .count());
+      msg = ((ThriftTableOperationException) 
exception.getCause()).getDescription();
+      // message should contain the limit of 5 and the number of files 
attempted to import 7
+      assertTrue(msg.contains(" 5"), msg);
+      assertTrue(msg.contains(" 7"), msg);
+      verifyData(client, tableName, 40, 79, false);
+
+      // try the middle tablet
+      var dir4 = getDir("/testBulkFileMFP4-");
+      for (int i = 11; i < 17; i++) {
+        writeData(dir4 + "/f" + i + ".", aconf, i * 10, (i + 1) * 10 - 1);
+      }
+      // add single file for the last tablet, this does not exceed the limit 
however it should not
+      // go through
+      writeData(dir4 + "/f_last.", aconf, 300, 400);
+      exception = assertThrows(AccumuloException.class,
+          () -> 
client.tableOperations().importDirectory(dir4).to(tableName).load());
+      // verify no files were moved by the failed bulk import
+      assertEquals(7, Arrays.stream(
+          getCluster().getFileSystem().listStatus(new Path(dir4), f -> 
f.getName().endsWith(".rf")))
+          .count());
+      msg = ((ThriftTableOperationException) 
exception.getCause()).getDescription();
+      // message should contain the limit of 5 and the number of files 
attempted to import 6
+      assertTrue(msg.contains(" 5"), msg);
+      assertTrue(msg.contains(" 6"), msg);
+      verifyData(client, tableName, 40, 79, false);
+
+      // try the last tablet
+      var dir5 = getDir("/testBulkFileMFP5-");
+      for (int i = 21; i < 28; i++) {
+        writeData(dir5 + "/f" + i + ".", aconf, i * 10, (i + 1) * 10 - 1);
+      }
+      // add single file for the first tablet, this does not exceed the limit 
however it should not
+      // go through
+      writeData(dir5 + "/f_last.", aconf, 0, 10);
+      exception = assertThrows(AccumuloException.class,
+          () -> 
client.tableOperations().importDirectory(dir5).to(tableName).load());
+      // verify no files were moved by the failed bulk import
+      assertEquals(8, Arrays.stream(
+          getCluster().getFileSystem().listStatus(new Path(dir5), f -> 
f.getName().endsWith(".rf")))
+          .count());
+      msg = ((ThriftTableOperationException) 
exception.getCause()).getDescription();
+      // message should contain the limit of 5 and the number of files 
attempted to import 7
+      assertTrue(msg.contains(" 5"), msg);
+      assertTrue(msg.contains(" 7"), msg);
+      verifyData(client, tableName, 40, 79, false);
+
+      // test an import that has more files than the limit, but not in a 
single tablet so it should
+      // work
+      var dir6 = getDir("/testBulkFileMFP6-");
+      for (int i = 8; i < 14; i++) {
+        writeData(dir6 + "/f" + i + ".", aconf, i * 10, (i + 1) * 10 - 1);
+      }
+      client.tableOperations().importDirectory(dir6).to(tableName).load();
+      // verify the bulk import moved the files
+      assertEquals(0, Arrays.stream(
+          getCluster().getFileSystem().listStatus(new Path(dir6), f -> 
f.getName().endsWith(".rf")))
+          .count());
+      verifyData(client, tableName, 40, 139, false);
+    }
+  }
+
   private void testSingleTabletSingleFileNoSplits(AccumuloClient c, boolean 
offline)
       throws Exception {
     if (offline) {
@@ -926,8 +1111,32 @@ public class BulkNewIT extends SharedMiniClusterBase {
     client.tableOperations().addSplits(tableName, splits);
   }
 
-  private void verifyData(AccumuloClient client, String table, int start, int 
end, boolean setTime)
-      throws Exception {
+  private long readRowValue(AccumuloClient client, String table, int row) 
throws Exception {
+    try (var scanner = client.createScanner(table)) {
+      scanner.setRange(new Range(row(row)));
+      var value = scanner.stream().map(Entry::getValue).map(Value::toString)
+          .collect(MoreCollectors.onlyElement());
+      return Long.parseLong(value);
+    }
+  }
+
+  private Map<String,Integer> countLoaded(AccumuloClient client, String table) 
throws Exception {
+    var ctx = ((ClientContext) client);
+    var tableId = ctx.getTableId(table);
+
+    try (var tabletsMetadata = 
ctx.getAmple().readTablets().forTable(tableId).build()) {
+      Map<String,Integer> counts = new HashMap<>();
+      for (var tabletMetadata : tabletsMetadata) {
+        String endRow =
+            tabletMetadata.getEndRow() == null ? "null" : 
tabletMetadata.getEndRow().toString();
+        counts.put(endRow, tabletMetadata.getLoaded().size());
+      }
+      return counts;
+    }
+  }
+
+  private void verifyData(AccumuloClient client, String table, int start, int 
end, int valueOffset,
+      boolean setTime) throws Exception {
     try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) {
 
       Iterator<Entry<Key,Value>> iter = scanner.iterator();
@@ -945,7 +1154,7 @@ public class BulkNewIT extends SharedMiniClusterBase {
           throw new Exception("unexpected row " + entry.getKey() + " " + i);
         }
 
-        if (Integer.parseInt(entry.getValue().toString()) != i) {
+        if (Integer.parseInt(entry.getValue().toString()) != valueOffset + i) {
           throw new Exception("unexpected value " + entry + " " + i);
         }
 
@@ -960,6 +1169,11 @@ public class BulkNewIT extends SharedMiniClusterBase {
     }
   }
 
+  private void verifyData(AccumuloClient client, String table, int start, int 
end, boolean setTime)
+      throws Exception {
+    verifyData(client, table, start, end, 0, setTime);
+  }
+
   private void verifyMetadata(AccumuloClient client, String tableName,
       Map<String,Set<String>> expectedHashes) {
 
@@ -1001,7 +1215,7 @@ public class BulkNewIT extends SharedMiniClusterBase {
     return String.format("%04d", r);
   }
 
-  private String writeData(String file, AccumuloConfiguration aconf, int s, 
int e)
+  private String writeData(String file, AccumuloConfiguration aconf, int s, 
int e, int valueOffset)
       throws Exception {
     FileSystem fs = getCluster().getFileSystem();
     String filename = file + RFile.EXTENSION;
@@ -1011,13 +1225,18 @@ public class BulkNewIT extends SharedMiniClusterBase {
         .withTableConfiguration(aconf).build()) {
       writer.startDefaultLocalityGroup();
       for (int i = s; i <= e; i++) {
-        writer.append(new Key(new Text(row(i))), new 
Value(Integer.toString(i)));
+        writer.append(new Key(new Text(row(i))), new 
Value(Integer.toString(valueOffset + i)));
       }
     }
 
     return hash(filename);
   }
 
+  private String writeData(String file, AccumuloConfiguration aconf, int s, 
int e)
+      throws Exception {
+    return writeData(file, aconf, s, e, 0);
+  }
+
   /**
    * This constraint is used to simulate an error in the metadata write for a 
bulk import.
    */
diff --git 
a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java 
b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
index f15968e09f..9ddff41e0d 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
@@ -266,7 +266,7 @@ public class NullTserver {
 
     @Override
     public Map<TKeyExtent,Long> allocateTimestamps(TInfo tinfo, TCredentials 
credentials,
-        List<TKeyExtent> tablets, int numStamps) throws TException {
+        List<TKeyExtent> tablets) throws TException {
       return Map.of();
     }
   }

Reply via email to