apurtell commented on code in PR #7363:
URL: https://github.com/apache/hbase/pull/7363#discussion_r2449675520


##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java:
##########
@@ -118,59 +158,93 @@ protected void internalFlush() {
 
   @Override
   public List<CompletableFuture<Void>> mutate(List<? extends Mutation> 
mutations) {
-    List<CompletableFuture<Void>> futures =
-      Stream.<CompletableFuture<Void>> 
generate(CompletableFuture::new).limit(mutations.size())
-        .collect(Collectors.toList());
+    List<CompletableFuture<Void>> futures = new ArrayList<>(mutations.size());
+    for (int i = 0, n = mutations.size(); i < n; i++) {
+      futures.add(new CompletableFuture<>());
+    }
+    if (closed) {
+      IOException ioe = new IOException("Already closed");
+      futures.forEach(f -> f.completeExceptionally(ioe));
+      return futures;
+    }
     long heapSize = 0;
     for (Mutation mutation : mutations) {
       heapSize += mutation.heapSize();
       if (mutation instanceof Put) {
         validatePut((Put) mutation, maxKeyValueSize);
       }
     }
-    synchronized (this) {
-      if (closed) {
-        IOException ioe = new IOException("Already closed");
-        futures.forEach(f -> f.completeExceptionally(ioe));
-        return futures;
-      }
+    boolean needFlush = false;
+    FlushType flushType = FlushType.SIZE;
+    lock.lock();
+    try {
       if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) {
         periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> {
-          synchronized (AsyncBufferedMutatorImpl.this) {
+          boolean shouldFlush = false;
+          lock.lock();
+          try {
             // confirm that we are still valid, if there is already an 
internalFlush call before us,
             // then we should not execute anymore. And in internalFlush we 
will set periodicFlush
             // to null, and since we may schedule a new one, so here we check 
whether the references
             // are equal.
             if (timeout == periodicFlushTask) {
               periodicFlushTask = null;
-              internalFlush();
+              shouldFlush = true;
             }
+          } finally {
+            lock.unlock();
+          }
+          if (shouldFlush) {
+            internalFlush(FlushType.PERIODIC);
           }
         }, periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
       }
+      // Preallocate to avoid potentially multiple resizes during addAll
+      this.mutations.ensureCapacity(this.mutations.size() + mutations.size());
+      this.futures.ensureCapacity(this.futures.size() + futures.size());

Review Comment:
   The calls to ensureCapacity were recommended (I forget exactly which 
resource I was looking at), but they do seem redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to