krconv commented on code in PR #7131:
URL: https://github.com/apache/hbase/pull/7131#discussion_r2201119420


##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -796,75 +804,165 @@ private boolean locateFinished(RegionInfo region, byte[] 
endKey, boolean endKeyI
 
   private <S, R> void coprocessorServiceUntilComplete(Function<RpcChannel, S> 
stubMaker,
     ServiceCaller<S, R> callable, PartialResultCoprocessorCallback<S, R> 
callback,
-    AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, RegionInfo 
region, Span span) {
+    MultiRegionCoprocessorServiceProgress<R> progress, RegionInfo region,
+    AtomicBoolean claimedRegionAssignment, Span span) {
     addListener(coprocessorService(stubMaker, callable, region, 
region.getStartKey()), (r, e) -> {
       try (Scope ignored = span.makeCurrent()) {
-        if (e != null) {
-          callback.onRegionError(region, e);
+        if (e instanceof RegionNoLongerExistsException) {

Review Comment:
   Previously, this error would fail the region. With these changes, we'll 
retry the range that this error affects instead.



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -796,75 +804,165 @@ private boolean locateFinished(RegionInfo region, byte[] 
endKey, boolean endKeyI
 
   private <S, R> void coprocessorServiceUntilComplete(Function<RpcChannel, S> 
stubMaker,
     ServiceCaller<S, R> callable, PartialResultCoprocessorCallback<S, R> 
callback,
-    AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, RegionInfo 
region, Span span) {
+    MultiRegionCoprocessorServiceProgress<R> progress, RegionInfo region,
+    AtomicBoolean claimedRegionAssignment, Span span) {
     addListener(coprocessorService(stubMaker, callable, region, 
region.getStartKey()), (r, e) -> {
       try (Scope ignored = span.makeCurrent()) {
-        if (e != null) {
-          callback.onRegionError(region, e);
+        if (e instanceof RegionNoLongerExistsException) {
+          RegionInfo newRegion = ((RegionNoLongerExistsException) 
e).getNewRegionInfo();
+          if 
(progress.markRegionReplacedExistingAndCheckNeedsToBeRestarted(newRegion)) {

Review Comment:
   When a region is merged, this is an optimization to avoid sending multiple 
requests to the new region



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -796,75 +804,165 @@ private boolean locateFinished(RegionInfo region, byte[] 
endKey, boolean endKeyI
 
   private <S, R> void coprocessorServiceUntilComplete(Function<RpcChannel, S> 
stubMaker,
     ServiceCaller<S, R> callable, PartialResultCoprocessorCallback<S, R> 
callback,
-    AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, RegionInfo 
region, Span span) {
+    MultiRegionCoprocessorServiceProgress<R> progress, RegionInfo region,
+    AtomicBoolean claimedRegionAssignment, Span span) {
     addListener(coprocessorService(stubMaker, callable, region, 
region.getStartKey()), (r, e) -> {
       try (Scope ignored = span.makeCurrent()) {
-        if (e != null) {
-          callback.onRegionError(region, e);
+        if (e instanceof RegionNoLongerExistsException) {
+          RegionInfo newRegion = ((RegionNoLongerExistsException) 
e).getNewRegionInfo();
+          if 
(progress.markRegionReplacedExistingAndCheckNeedsToBeRestarted(newRegion)) {
+            LOG.debug(
+              "Attempted to send a coprocessor service RPC to region {} which 
no"
+                + " longer exists, will attempt to send RPCs to the region(s) 
that replaced it",
+              region.getEncodedName());
+            restartCoprocessorServiceForRange(stubMaker, callable, callback, 
region.getStartKey(),
+              region.getEndKey(), newRegion, progress, span);
+          }
+          progress.markRegionFinished(region);
         } else {
-          callback.onRegionComplete(region, r);
-        }
+          if (!claimedRegionAssignment.get()) {
+            if (!progress.tryClaimRegionAssignment(region)) {
+              progress.markRegionFinished(region);
+              return;
+            } else {
+              claimedRegionAssignment.set(true);
+            }
+          }
+          progress.onResponse(region, r, e);
 
-        ServiceCaller<S, R> updatedCallable;
-        if (e == null && r != null) {
-          updatedCallable = callback.getNextCallable(r, region);
-        } else {
-          updatedCallable = null;
-        }
+          ServiceCaller<S, R> updatedCallable;
+          if (e == null && r != null) {
+            updatedCallable = callback.getNextCallable(r, region);
+          } else {
+            updatedCallable = null;
+          }
 
-        // If updatedCallable is non-null, we will be sending another request, 
so no need to
-        // decrement unfinishedRequest (recall that && short-circuits).
-        // If updatedCallable is null, and unfinishedRequest decrements to 0, 
we're done with the
-        // requests for this coprocessor call.
-        if (
-          updatedCallable == null && unfinishedRequest.decrementAndGet() == 0
-            && locateFinished.get()
-        ) {
-          callback.onComplete();
-        } else if (updatedCallable != null) {
-          Duration waitInterval = callback.getWaitInterval(r, region);
-          LOG.trace("Coprocessor returned incomplete result. "
-            + "Sleeping for {} before making follow-up request.", 
waitInterval);
-          if (!waitInterval.isZero()) {
-            AsyncConnectionImpl.RETRY_TIMER.newTimeout(
-              (timeout) -> coprocessorServiceUntilComplete(stubMaker, 
updatedCallable, callback,
-                locateFinished, unfinishedRequest, region, span),
-              waitInterval.toMillis(), TimeUnit.MILLISECONDS);
+          // If updatedCallable is non-null, we will be sending another 
request, so no need to
+          // decrement unfinishedRequest (recall that && short-circuits).
+          // If updatedCallable is null, and unfinishedRequest decrements to 
0, we're done with the
+          // requests for this coprocessor call.
+          if (updatedCallable == null) {
+            progress.markRegionFinished(region);
           } else {
-            coprocessorServiceUntilComplete(stubMaker, updatedCallable, 
callback, locateFinished,
-              unfinishedRequest, region, span);
+            Duration waitInterval = callback.getWaitInterval(r, region);
+            LOG.trace("Coprocessor returned incomplete result. "
+              + "Sleeping for {} before making follow-up request.", 
waitInterval);
+            if (!waitInterval.isZero()) {
+              AsyncConnectionImpl.RETRY_TIMER.newTimeout(
+                (timeout) -> coprocessorServiceUntilComplete(stubMaker, 
updatedCallable, callback,
+                  progress, region, claimedRegionAssignment, span),
+                waitInterval.toMillis(), TimeUnit.MILLISECONDS);
+            } else {
+              coprocessorServiceUntilComplete(stubMaker, updatedCallable, 
callback, progress,
+                region, claimedRegionAssignment, span);
+            }
           }
         }
+      } catch (Throwable t) {
+        if (claimedRegionAssignment.get() || 
progress.tryClaimRegionAssignment(region)) {
+          LOG.error("Error processing coprocessor service response for region 
{}",
+            region.getEncodedName(), t);
+          progress.onResponse(region, null, t);
+        }
+        progress.markRegionFinished(region);

Review Comment:
   I noticed that if the `getNextCallable` throws, we never call `onComplete`; 
this catch clause fixes that



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -944,4 +1051,208 @@ public <S, R> CoprocessorServiceBuilder<S, R> 
coprocessorService(
     PartialResultCoprocessorCallback<S, R> callback) {
     return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
   }
+
+  /**
+   * Coordinates coprocessor service responses across multiple regions when 
executing range queries.
+   * <p>
+   * When clients send coprocessor RPCs to a range, the process involves 
locating every region in
+   * that range and sending an RPC to a row believed to be in each region. 
However, the actual
+   * region may differ from what's in the meta cache, leading to complex race 
conditions that this
+   * class manages.
+   * <p>
+   * Race conditions handled:
+   * <ul>
+   * <li><b>Region merges:</b> Multiple original requests consolidate into a 
single request; this
+   * class provides leader election to ensure only one RPC is sent for the 
merged range</li>
+   * <li><b>Region splits:</b> One original request becomes multiple requests; 
the service is
+   * restarted for the affected range to cover all resulting regions</li>
+   * <li><b>Region alignment changes:</b> When multiple original requests 
cover part of a new
+   * region, but it doesn't fall into a simple split or merge case, the 
response is deduplicated
+   * after it finishes executing</li>
+   * <li><b>Region movement during execution:</b> If a coprocessor has already 
started on a region
+   * but subsequent calls detect the region has changed, an error is reported 
as the request cannot
+   * be recovered</li>
+   * </ul>
+   * <p>
+   * The class maintains region progress state to prevent duplicate response 
processing and ensures
+   * proper calling of Callback::onComplete when all regions have finished 
processing.
+   */
+  private static class MultiRegionCoprocessorServiceProgress<R> {
+    private final AtomicBoolean locateFinished = new AtomicBoolean(false);
+    private final ConcurrentNavigableMap<RegionInfo, RegionProgress> regions =
+      new ConcurrentSkipListMap<>();

Review Comment:
   I went this direction (enforce a strict contract on the callback and avoid 
sending duplicate RPCs where possible) because I think it is easiest way to get 
this right. Another approach I considered was having some mechanism that 
coordinated the RPCs before we sent them, but found that difficult for two 
reasons
    1. I think reasoning about the ranges that needed to be re-processed would 
have been less intuitive because it'd need to explicitly handle all the edge 
cases here
    2. Wouldn't have been able to recurse in the same way to simplify the 
problem (because the method signature doesn't expose a way to manage RPCs like 
that
    3. Would have required some sort of locking to have all requests wait to 
send responses until we have confirmation that the regions are where we thought 
they were (i.e. the meta cache was up to date)
    4. It wouldn't have been any more correct, and would only slightly reduce 
the number of actual RPCs that we'd send in this very unlikely situation (we 
only send duplicate RPCs in the overlapping assignment situation from below
   
   



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -796,75 +804,165 @@ private boolean locateFinished(RegionInfo region, byte[] 
endKey, boolean endKeyI
 
   private <S, R> void coprocessorServiceUntilComplete(Function<RpcChannel, S> 
stubMaker,
     ServiceCaller<S, R> callable, PartialResultCoprocessorCallback<S, R> 
callback,
-    AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, RegionInfo 
region, Span span) {
+    MultiRegionCoprocessorServiceProgress<R> progress, RegionInfo region,
+    AtomicBoolean claimedRegionAssignment, Span span) {
     addListener(coprocessorService(stubMaker, callable, region, 
region.getStartKey()), (r, e) -> {
       try (Scope ignored = span.makeCurrent()) {
-        if (e != null) {
-          callback.onRegionError(region, e);
+        if (e instanceof RegionNoLongerExistsException) {
+          RegionInfo newRegion = ((RegionNoLongerExistsException) 
e).getNewRegionInfo();
+          if 
(progress.markRegionReplacedExistingAndCheckNeedsToBeRestarted(newRegion)) {
+            LOG.debug(
+              "Attempted to send a coprocessor service RPC to region {} which 
no"
+                + " longer exists, will attempt to send RPCs to the region(s) 
that replaced it",
+              region.getEncodedName());
+            restartCoprocessorServiceForRange(stubMaker, callable, callback, 
region.getStartKey(),
+              region.getEndKey(), newRegion, progress, span);
+          }
+          progress.markRegionFinished(region);
         } else {
-          callback.onRegionComplete(region, r);
-        }
+          if (!claimedRegionAssignment.get()) {
+            if (!progress.tryClaimRegionAssignment(region)) {

Review Comment:
   There are two paths we need to consider now.
    * Original call
    * Recursive call (i.e. when we restart the request on the failed range)
   
   Both places attempt to claim an "assignment" for the region after they 
receive a response, before actually propagating the request to the client's 
`CoprocessorCallback`



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -944,4 +1051,208 @@ public <S, R> CoprocessorServiceBuilder<S, R> 
coprocessorService(
     PartialResultCoprocessorCallback<S, R> callback) {
     return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
   }
+
+  /**
+   * Coordinates coprocessor service responses across multiple regions when 
executing range queries.
+   * <p>
+   * When clients send coprocessor RPCs to a range, the process involves 
locating every region in
+   * that range and sending an RPC to a row believed to be in each region. 
However, the actual
+   * region may differ from what's in the meta cache, leading to complex race 
conditions that this
+   * class manages.
+   * <p>
+   * Race conditions handled:
+   * <ul>
+   * <li><b>Region merges:</b> Multiple original requests consolidate into a 
single request; this
+   * class provides leader election to ensure only one RPC is sent for the 
merged range</li>
+   * <li><b>Region splits:</b> One original request becomes multiple requests; 
the service is
+   * restarted for the affected range to cover all resulting regions</li>
+   * <li><b>Region alignment changes:</b> When multiple original requests 
cover part of a new
+   * region, but it doesn't fall into a simple split or merge case, the 
response is deduplicated
+   * after it finishes executing</li>
+   * <li><b>Region movement during execution:</b> If a coprocessor has already 
started on a region
+   * but subsequent calls detect the region has changed, an error is reported 
as the request cannot
+   * be recovered</li>
+   * </ul>
+   * <p>
+   * The class maintains region progress state to prevent duplicate response 
processing and ensures
+   * proper calling of Callback::onComplete when all regions have finished 
processing.
+   */
+  private static class MultiRegionCoprocessorServiceProgress<R> {
+    private final AtomicBoolean locateFinished = new AtomicBoolean(false);
+    private final ConcurrentNavigableMap<RegionInfo, RegionProgress> regions =
+      new ConcurrentSkipListMap<>();

Review Comment:
   The largest potential risk I see in affecting existing functionality is that 
this assignment mechanism blocks a response that would have been allowed 
before. It's imposing some strict constraints on how the methods in 
`PartialResultCoprocessorCallback` can be called (that I think are intuitive)
   
    * `onRegionComplete` can only be called once per region
    * `onRegionError` can be called any number of times per region
    * `onComplete` will be the last call to the callback and will be called 
once if there isn't an error
    * `onError` can be called any number of times, can happen at any time, and 
`onComplete` will not be called
   
   For callbacks that require multiple RPCs per region (e.g. a paging 
mechanism), the same is true except that the `onRegionComplete` will only 
_return_ one chain of RPCs (where a chain is make rpc call -> continue until 
getNextCallable() returns null)
   
   



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -944,4 +1051,208 @@ public <S, R> CoprocessorServiceBuilder<S, R> 
coprocessorService(
     PartialResultCoprocessorCallback<S, R> callback) {
     return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
   }
+
+  /**
+   * Coordinates coprocessor service responses across multiple regions when 
executing range queries.
+   * <p>
+   * When clients send coprocessor RPCs to a range, the process involves 
locating every region in
+   * that range and sending an RPC to a row believed to be in each region. 
However, the actual
+   * region may differ from what's in the meta cache, leading to complex race 
conditions that this
+   * class manages.
+   * <p>
+   * Race conditions handled:
+   * <ul>
+   * <li><b>Region merges:</b> Multiple original requests consolidate into a 
single request; this
+   * class provides leader election to ensure only one RPC is sent for the 
merged range</li>
+   * <li><b>Region splits:</b> One original request becomes multiple requests; 
the service is
+   * restarted for the affected range to cover all resulting regions</li>
+   * <li><b>Region alignment changes:</b> When multiple original requests 
cover part of a new
+   * region, but it doesn't fall into a simple split or merge case, the 
response is deduplicated
+   * after it finishes executing</li>
+   * <li><b>Region movement during execution:</b> If a coprocessor has already 
started on a region
+   * but subsequent calls detect the region has changed, an error is reported 
as the request cannot
+   * be recovered</li>
+   * </ul>
+   * <p>
+   * The class maintains region progress state to prevent duplicate response 
processing and ensures
+   * proper calling of Callback::onComplete when all regions have finished 
processing.
+   */
+  private static class MultiRegionCoprocessorServiceProgress<R> {
+    private final AtomicBoolean locateFinished = new AtomicBoolean(false);
+    private final ConcurrentNavigableMap<RegionInfo, RegionProgress> regions =
+      new ConcurrentSkipListMap<>();
+    private final CoprocessorCallback<R> callback;
+
+    private MultiRegionCoprocessorServiceProgress(CoprocessorCallback<R> 
callback) {
+      this.callback = callback;
+    }
+
+    void markLocateFinished() {
+      locateFinished.set(true);
+    }
+
+    void markRegionLocated(RegionInfo region) {
+      tryUpdateRegionProgress(region, RegionProgress.NONE, 
RegionProgress.LOCATED);
+    }
+
+    /**
+     * Try to claim the assignment for the region in order to return the 
response to the caller.
+     * This ensure that there is only ever one set of RPCs returned for a 
region.
+     * @return true if claimed successfully, false otherwise
+     */
+    boolean tryClaimRegionAssignment(RegionInfo region) {
+      boolean isNewlyAssigned = tryUpdateRegionProgress(region,
+        EnumSet.of(RegionProgress.NONE, RegionProgress.LOCATED), 
RegionProgress.ASSIGNED);
+      if (!isNewlyAssigned) {
+        return false;
+      }
+      List<Pair<RegionInfo, RegionProgress>> overlappingAssignments =
+        getOverlappingRegionAssignments(region);
+      if (!overlappingAssignments.isEmpty()) {
+        callback.onRegionError(region, new DoNotRetryIOException(
+          ("Region %s has replaced a region that the coprocessor service 
already started on, likely"
+            + " due to a region merge. Overlapping regions: %s")
+            .formatted(region.getEncodedName(), 
overlappingAssignments.stream().map(Pair::getFirst)
+              .map(RegionInfo::getEncodedName).collect(Collectors.joining(", 
")))));
+        tryUpdateRegionProgress(region, RegionProgress.ASSIGNED, 
RegionProgress.CANCELLED);
+        return false;
+      }

Review Comment:
   Overlapping assignments is a tricky one to explain;
   Say we have this scenario:
   Region A: 0x00 -> 0x88
   Region B: 0x88 -> 0xFF
   
   To send coprocessors to all regions, we'd call:
   coprocessorService(0x00)
   coprocessorService(0x88)
   
   Let's saw we thought A and B were the regions when the request started 
because that was what was in the meta cache, but actually the regions looked 
like this:
   
   Region C: 0x00 -> 0x55
   Region D: 0x55 -> 0xAA
   Region E: 0xAA -> 0xFF
   
   If we go through this process to retry each of the failed regions, the 
restarted requests would re-run locations like this:
   locateRange(0x00 -> 0x88): Region C, Region D
   locateRange(0x88 -> 0xFF): Region D, Region E
   
   And the actual RPCs would look like this
   coprocessorService(0x00)
   coprocessorService(0x55)
   
   coprocessorService(0x55)
   coprocessorService(0x88)
   
   Region D would get retried twice; there is no way from the client to 
atomically locate the region and send the RPC to it, so instead this PR 
attempts to catch this case when we get the response from region D for the 
second time.
   
   Furthermore, this same race condition can happen when we are sending a 
coprocessor service that requires multiple RPCs to be sent per region, which is 
the case here where we'd see an overlapping assignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@hbase.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to