charlesconnell commented on code in PR #6978:
URL: https://github.com/apache/hbase/pull/6978#discussion_r2095959938


##########
hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateImplementation.java:
##########
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static 
org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
+import static 
org.apache.hadoop.hbase.quotas.RpcThrottlingException.Type.ReadSizeExceeded;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
+import org.apache.hadoop.hbase.quotas.OperationQuota;
+import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.RegionScannerImpl;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.ArgumentCaptor;
+import org.mockito.stubbing.Answer;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AggregateProtos.AggregateRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AggregateProtos.AggregateResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
+/**
+ * Test AggregateImplementation with throttling and partial results
+ */
+@Category({ SmallTests.class, CoprocessorTests.class })
+public class TestAggregateImplementation {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAggregateImplementation.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  private static final byte[] CF = Bytes.toBytes("CF");
+  private static final byte[] CQ = Bytes.toBytes("CQ");
+  private static final int NUM_ROWS = 5;
+  private static final int THROTTLE_AT_ROW = 2;
+  private static final LongColumnInterpreter LONG_COLUMN_INTERPRETER = new 
LongColumnInterpreter();
+
+  private AggregateImplementation<Long, Long, HBaseProtos.LongMsg, 
HBaseProtos.LongMsg,
+    HBaseProtos.LongMsg> aggregate;
+  private RegionCoprocessorEnvironment env;
+  private HRegion region;
+  private RegionScannerImpl scanner;
+  private Scan scan;
+  private AggregateRequest request;
+  private RpcController controller;
+
+  @Before
+  public void setUp() throws Exception {
+    env = mock(RegionCoprocessorEnvironment.class);
+    region = mock(HRegion.class);
+    RegionCoprocessorHost host = mock(RegionCoprocessorHost.class);
+    when(env.getRegion()).thenReturn(region);
+    when(region.getCoprocessorHost()).thenReturn(host);
+
+    RegionInfo regionInfo = mock(RegionInfo.class);
+    when(region.getRegionInfo()).thenReturn(regionInfo);
+    when(regionInfo.getRegionNameAsString()).thenReturn("testRegion");
+
+    scan = new Scan().addColumn(CF, CQ);
+
+    scanner = mock(RegionScannerImpl.class);
+    doAnswer(createMockScanner()).when(scanner).next(any(List.class));
+    when(region.getScanner(any())).thenReturn(scanner);
+
+    doAnswer(createMockQuota()).when(env).checkScanQuota(any(), anyLong(), 
anyLong());
+
+    request = AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+      .setInterpreterClassName(LongColumnInterpreter.class.getName())
+      .setClientSupportsPartialResult(true).build();
+
+    controller = mock(RpcController.class);
+
+    aggregate = new AggregateImplementation<>();
+    aggregate.start(env);
+  }
+
+  private Answer<Boolean> createMockScanner() throws IOException {
+    AtomicInteger callCount = new AtomicInteger(0);
+    return invocation -> {
+      List<Cell> results = (List<Cell>) invocation.getArguments()[0];
+      int call = callCount.getAndIncrement();
+      if (call < NUM_ROWS) {
+        Cell cell = mock(Cell.class);
+        when(cell.getRowArray()).thenReturn(Bytes.toBytes("row" + (call + 1)));
+        when(cell.getRowOffset()).thenReturn(0);
+        when(cell.getRowLength()).thenReturn((short) 4);
+
+        when(cell.getValueArray()).thenReturn(Bytes.toBytes((long) call + 1));
+        when(cell.getValueOffset()).thenReturn(0);
+        when(cell.getValueLength()).thenReturn(8);
+        results.add(cell);
+        return call < NUM_ROWS - 1;
+      } else {
+        // No more rows
+        return false;
+      }
+    };
+  }
+
+  private Answer<OperationQuota> createMockQuota() throws IOException {
+    OperationQuota mockQuota = mock(OperationQuota.class);
+
+    final AtomicInteger rowCount = new AtomicInteger(0);
+
+    return invocation -> {
+      int count = rowCount.incrementAndGet();
+      if (count == THROTTLE_AT_ROW) {
+        // Create a throttling exception with wait interval
+        RpcThrottlingException throttlingEx =
+          new RpcThrottlingException(ReadSizeExceeded, 1000, "Throttled for 
testing");
+        throw throttlingEx;
+      }
+      return mockQuota;
+    };
+  }
+
+  private void reset() throws IOException {
+    // Create a non-throttling quota for the second call, since throttling
+    // should only happen on the first call
+    OperationQuota nonThrottlingQuota = mock(OperationQuota.class);
+    when(env.checkScanQuota(any(), anyLong(), 
anyLong())).thenReturn(nonThrottlingQuota);
+  }
+
+  @Test
+  public void testMaxWithThrottling() throws Exception {
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    aggregate.getMax(controller, request, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+    assertTrue("Response should indicate there are more rows", 
response.hasNextChunkStartRow());
+    assertEquals("Wait interval should be set", 1000, 
response.getWaitIntervalMs());
+    ByteString b = response.getFirstPart(0);
+    HBaseProtos.LongMsg q = 
getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+    assertEquals(1L, (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+
+    // Create a second request with the next chunk start row
+    AggregateRequest request2 = AggregateRequest.newBuilder(request)
+      
.setScan(request.getScan().toBuilder().setStartRow(response.getNextChunkStartRow()).build())
+      .build();
+
+    // Reset throttles for second call
+    reset();
+
+    RpcCallback<AggregateResponse> callback2 = mock(RpcCallback.class);
+    aggregate.getMax(controller, request2, callback2);
+
+    // Verify second callback was called
+    verify(callback2).run(responseCaptor.capture());
+
+    // Verify the complete result
+    AggregateResponse response2 = responseCaptor.getValue();
+    b = response2.getFirstPart(0);
+    q = getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+    assertEquals("Final max value should be correct", 5L,
+      (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+
+    assertFalse("Response should not indicate there are more rows",
+      response2.hasNextChunkStartRow());
+  }
+
+  @Test
+  public void testMaxThrottleWithNoResults() throws Exception {
+    AggregateRequest request = 
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+      .setInterpreterClassName(LongColumnInterpreter.class.getName())
+      .setClientSupportsPartialResult(true).build();
+
+    when(env.checkScanQuota(any(), anyLong(), anyLong()))
+      .thenThrow(new RpcThrottlingException(ReadSizeExceeded, 1000, "Throttled 
for testing"));
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // call gets no results, and response should contain the same start row as 
the request, because
+    // no progress in the scan was made
+    aggregate.getMax(controller, request, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+
+    assertTrue("Response should indicate there are more rows", 
response.hasNextChunkStartRow());
+    assertEquals("response should contain the same start row as the request",
+      request.getScan().getStartRow(), response.getNextChunkStartRow());
+  }
+
+  @Test
+  public void testMaxWithNoResults() throws Exception {
+    AggregateRequest request = 
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+      .setInterpreterClassName(LongColumnInterpreter.class.getName())
+      .setClientSupportsPartialResult(false).build();
+
+    doAnswer(invocation -> false).when(scanner).next(any(List.class));
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    aggregate.getMax(controller, request, callback);
+    verify(callback).run(responseCaptor.capture());
+    AggregateResponse response = responseCaptor.getValue();
+    assertNull(response);
+  }
+
+  @Test
+  public void testMaxDoesNotSupportPartialResults() throws Exception {
+    AggregateRequest noPartialRequest =
+      AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+        .setInterpreterClassName(LongColumnInterpreter.class.getName())
+        .setClientSupportsPartialResult(false).build();
+
+    OperationQuota nonThrottlingQuota = mock(OperationQuota.class);
+    when(env.checkScanQuota(any(), anyLong(), 
anyLong())).thenReturn(nonThrottlingQuota);
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // Call should complete without throttling
+    aggregate.getMax(controller, noPartialRequest, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+
+    assertFalse("Response should not indicate there are more rows",
+      response.hasNextChunkStartRow());
+    ByteString b = response.getFirstPart(0);
+    HBaseProtos.LongMsg q = 
getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+    assertEquals(5L, (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+  }
+

Review Comment:
   I have coverage of all supported combinations of behavior. `partial results 
are not supported and a throttle is triggered` is not a supported combination 
-- when partial results are not supported, the coprocessor charges ahead 
without checking quotas.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to