[ 
https://issues.apache.org/jira/browse/GEODE-9016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298471#comment-17298471
 ] 

ASF GitHub Bot commented on GEODE-9016:
---------------------------------------

DonalEvans commented on a change in pull request #6104:
URL: https://github.com/apache/geode/pull/6104#discussion_r590881622



##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
##########
@@ -716,7 +716,7 @@ protected void _distribute() {
    * This is similar to 
CacheClientNotifier.removeDestroyTokensFromCqResultKeys() where the
    * destroyed events for local CQs are handled.
    */
-  private void removeDestroyTokensFromCqResultKeys(FilterRoutingInfo 
filterRouting) {
+  protected void removeDestroyTokensFromCqResultKeys(FilterRoutingInfo 
filterRouting) {

Review comment:
       This method (and the overridden version in DistributedPutAllOperation) 
can be package-private instead of protected. It's best to keep visibility as 
restricted as possible.

##########
File path: 
geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
##########
@@ -120,6 +123,60 @@
 
   private static int bridgeServerPort;
 
+  @Test
+  public void testPutAllWithCQLocalDestroy() throws Exception {

Review comment:
       This method never throws an `Exception` so this is not necessary.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
##########
@@ -814,6 +818,46 @@ protected FilterRoutingInfo getRecipientFilterRouting(Set 
cacheOpRecipients) {
     return consolidated;
   }
 
+  @Override
+  protected void removeDestroyTokensFromCqResultKeys(FilterRoutingInfo 
filterRouting) {
+    for (InternalDistributedMember m : filterRouting.getMembers()) {
+      FilterInfo filterInfo = filterRouting.getFilterInfo(m);
+      if (filterInfo.getCQs() == null) {
+        continue;
+      }
+
+      CacheDistributionAdvisor.CacheProfile cf =
+          (CacheDistributionAdvisor.CacheProfile) ((Bucket) 
getRegion()).getPartitionedRegion()
+              .getCacheDistributionAdvisor().getProfile(m);
+
+      if (cf == null || cf.filterProfile == null || 
cf.filterProfile.isLocalProfile()
+          || cf.filterProfile.getCqMap().isEmpty()) {
+        continue;
+      }
+
+      for (Object value : cf.filterProfile.getCqMap().values()) {
+        ServerCQ cq = (ServerCQ) value;
+
+        for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
+          Long cqID = e.getKey();
+          // For the CQs satisfying the event with destroy CQEvent, remove
+          // the entry from CQ cache.
+          for (int i = 0; i < this.putAllData.length; i++) {

Review comment:
       Until this line, the code in this method and in 
`DistributedCacheOperation.removeDestroyTokensFromCqResultKeys
   ()` is identical. Would it be possible to extract the duplicated code to a 
method in `DistributedCacheOperation` that both classes could use, rather than 
duplicating it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NullPointerException during PutAll with CQ LOCAL_DESTROY event
> --------------------------------------------------------------
>
>                 Key: GEODE-9016
>                 URL: https://issues.apache.org/jira/browse/GEODE-9016
>             Project: Geode
>          Issue Type: Bug
>            Reporter: Jianxia Chen
>            Assignee: Jianxia Chen
>            Priority: Major
>              Labels: pull-request-available
>
> It is possible that PutAll operation hits a NPE when CQ LOCAL_DESTROY event 
> is generated.
> {code:java}
> java.lang.NullPointerException
>         at 
> java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
>         at 
> org.apache.geode.cache.query.cq.internal.ServerCQResultsCachePartitionRegionImpl.remove(ServerCQResultsCachePartitionRegionImpl.java:69)
>         at 
> org.apache.geode.cache.query.cq.internal.ServerCQImpl.removeFromCqResultKeys(ServerCQImpl.java:297)
>         at 
> org.apache.geode.internal.cache.DistributedCacheOperation.removeDestroyTokensFromCqResultKeys(DistributedCacheOperation.java:743)
>         at 
> org.apache.geode.internal.cache.DistributedCacheOperation._distribute(DistributedCacheOperation.java:693)
>         at 
> org.apache.geode.internal.cache.DistributedCacheOperation.startOperation(DistributedCacheOperation.java:277)
>         at 
> org.apache.geode.internal.cache.DistributedRegion.postPutAllSend(DistributedRegion.java:3304)
>         at 
> org.apache.geode.internal.cache.LocalRegionDataView.postPutAll(LocalRegionDataView.java:358)
>         at 
> org.apache.geode.internal.cache.partitioned.PutAllPRMessage.doPostPutAll(PutAllPRMessage.java:568)
>         at 
> org.apache.geode.internal.cache.partitioned.PutAllPRMessage.doLocalPutAll(PutAllPRMessage.java:507)
>         at 
> org.apache.geode.internal.cache.partitioned.PutAllPRMessage.operateOnPartitionedRegion(PutAllPRMessage.java:326)
>         at 
> org.apache.geode.internal.cache.partitioned.PartitionMessage.process(PartitionMessage.java:333)
>         at 
> org.apache.geode.distributed.internal.DistributionMessage.scheduleAction(DistributionMessage.java:376)
>         at 
> org.apache.geode.distributed.internal.DistributionMessage$1.run(DistributionMessage.java:440)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at 
> org.apache.geode.distributed.internal.ClusterOperationExecutors.runUntilShutdown(ClusterOperationExecutors.java:442)
>         at 
> org.apache.geode.distributed.internal.ClusterOperationExecutors.doPartitionRegionThread(ClusterOperationExecutors.java:422)
>         at 
> org.apache.geode.logging.internal.executors.LoggingThreadFactory.lambda$newThread$0(LoggingThreadFactory.java:119)
>         at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to