Apache9 commented on code in PR #7363:
URL: https://github.com/apache/hbase/pull/7363#discussion_r2483722358


##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java:
##########
@@ -153,24 +220,45 @@ Stream.<CompletableFuture<Void>> 
generate(CompletableFuture::new).limit(mutation
       bufferedSize += heapSize;
       if (bufferedSize >= writeBufferSize) {
         LOG.trace("Flushing because write buffer size {} reached", 
writeBufferSize);
-        internalFlush();
-      } else if (maxMutations > 0 && this.mutations.size() >= maxMutations) {
+        // drain now and send after releasing the lock
+        batch = drainBatch();
+      } else if (this.mutations.size() >= maxMutations) {
         LOG.trace("Flushing because max mutations {} reached", maxMutations);
-        internalFlush();
+        batch = drainBatch();
       }
+    } finally {
+      lock.unlock();
+    }
+    // Send outside of lock
+    if (batch != null) {
+      sendBatch(batch);
     }
     return futures;
   }
 
   @Override
-  public synchronized void flush() {
+  public void flush() {
     internalFlush();

Review Comment:
   Ah, OK, there is still a call here, but I think we'd better implement it by 
our own here and remove the internalFlush method.
   ```
       Batch batch = null;
       if (!closed) {
         lock.lock();
         try {
           if (!closed) {
             batch = drainBatch(); // Drains under lock
           }
         } finally {
           lock.unlock();
         }
       }
       // Send the batch
       if (batch != null) {
         sendBatch(batch); // Sends outside of lock
       }
   ```
   
   The only difference comparing to close method is that we do not need to set 
closed to true.



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java:
##########
@@ -90,21 +105,63 @@ public Configuration getConfiguration() {
 
   // will be overridden in test
   protected void internalFlush() {
-    if (periodicFlushTask != null) {
-      periodicFlushTask.cancel();
-      periodicFlushTask = null;
+    Batch batch = drainBatch(); // Drains under lock
+    if (batch == null) {
+      return;
+    }
+    sendBatch(batch); // Sends outside of lock
+  }
+
+  /**
+   * Atomically drains the current buffered mutations and futures under {@link 
#lock} and prepares
+   * this mutator to accept a new batch.
+   * <p>
+   * Acquires {@link #lock} and cancels any pending {@link #periodicFlushTask} 
to avoid a redundant
+   * flush for the data we are about to send. Swaps the shared {@link 
#mutations} and
+   * {@link #futures} lists into a returned {@link Batch}, replaces them with 
fresh lists, and
+   * resets {@link #bufferedSize} to zero.
+   * <p>
+   * If there is nothing buffered, returns {@code null} so callers can skip 
sending work.
+   * @return a {@link Batch} containing drained mutations and futures, or 
{@code null} if empty
+   */
+  private Batch drainBatch() {
+    ArrayList<Mutation> toSend;
+    ArrayList<CompletableFuture<Void>> toComplete;
+    lock.lock();

Review Comment:
   I think we have already hold lock while calling  this method? So we do not 
need to lock it again.



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java:
##########
@@ -90,21 +105,63 @@ public Configuration getConfiguration() {
 
   // will be overridden in test
   protected void internalFlush() {

Review Comment:
   Do we still need this method? Seems we do not call it anywhere.
   
   TestAsyncBufferedMutator, i think we should override drainBatch method now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to