Apache9 commented on code in PR #7363:
URL: https://github.com/apache/hbase/pull/7363#discussion_r2467620700


##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java:
##########
@@ -118,33 +175,43 @@ protected void internalFlush() {
 
   @Override
   public List<CompletableFuture<Void>> mutate(List<? extends Mutation> 
mutations) {
-    List<CompletableFuture<Void>> futures =
-      Stream.<CompletableFuture<Void>> 
generate(CompletableFuture::new).limit(mutations.size())
-        .collect(Collectors.toList());
+    List<CompletableFuture<Void>> futures = new ArrayList<>(mutations.size());
+    for (int i = 0, n = mutations.size(); i < n; i++) {
+      futures.add(new CompletableFuture<>());
+    }
+    if (closed) {
+      IOException ioe = new IOException("Already closed");
+      futures.forEach(f -> f.completeExceptionally(ioe));
+      return futures;
+    }
     long heapSize = 0;
     for (Mutation mutation : mutations) {
       heapSize += mutation.heapSize();
       if (mutation instanceof Put) {
         validatePut((Put) mutation, maxKeyValueSize);
       }
     }
-    synchronized (this) {
-      if (closed) {
-        IOException ioe = new IOException("Already closed");
-        futures.forEach(f -> f.completeExceptionally(ioe));
-        return futures;
-      }
+    Batch batch = null;
+    lock.lock();
+    try {
       if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) {
         periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> {
-          synchronized (AsyncBufferedMutatorImpl.this) {
+          boolean shouldFlush = false;
+          lock.lock();
+          try {
             // confirm that we are still valid, if there is already an 
internalFlush call before us,
             // then we should not execute anymore. And in internalFlush we 
will set periodicFlush
             // to null, and since we may schedule a new one, so here we check 
whether the references
             // are equal.
             if (timeout == periodicFlushTask) {
               periodicFlushTask = null;
-              internalFlush();
+              shouldFlush = true;

Review Comment:
   I think here we should also use use drainBatch and then send the batch 
outside lock pattern, Otherwise I'm afraid there could still be race...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to