[ 
https://issues.apache.org/jira/browse/GEODE-9016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298299#comment-17298299
 ] 

ASF GitHub Bot commented on GEODE-9016:
---------------------------------------

kirklund commented on a change in pull request #6104:
URL: https://github.com/apache/geode/pull/6104#discussion_r590675288



##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
##########
@@ -814,6 +818,46 @@ protected FilterRoutingInfo getRecipientFilterRouting(Set 
cacheOpRecipients) {
     return consolidated;
   }
 
+  @Override
+  protected void removeDestroyTokensFromCqResultKeys(FilterRoutingInfo 
filterRouting) {
+    for (InternalDistributedMember m : filterRouting.getMembers()) {
+      FilterInfo filterInfo = filterRouting.getFilterInfo(m);
+      if (filterInfo.getCQs() == null) {
+        continue;
+      }
+
+      CacheDistributionAdvisor.CacheProfile cf =
+          (CacheDistributionAdvisor.CacheProfile) ((Bucket) 
getRegion()).getPartitionedRegion()
+              .getCacheDistributionAdvisor().getProfile(m);
+
+      if (cf == null || cf.filterProfile == null || 
cf.filterProfile.isLocalProfile()
+          || cf.filterProfile.getCqMap().isEmpty()) {
+        continue;
+      }
+
+      for (Object value : cf.filterProfile.getCqMap().values()) {
+        ServerCQ cq = (ServerCQ) value;
+
+        for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
+          Long cqID = e.getKey();
+          // For the CQs satisfying the event with destroy CQEvent, remove
+          // the entry from CQ cache.
+          for (int i = 0; i < this.putAllData.length; i++) {
+            @Unretained
+            EntryEventImpl entryEvent = getEventForPosition(i);
+            if (entryEvent != null) {
+              if (cq != null && cq.getFilterID() != null && 
cq.getFilterID().equals(cqID)
+                  && (e.getValue().equals(MessageType.LOCAL_DESTROY))
+                  && entryEvent.getKey() != null) {
+                cq.removeFromCqResultKeys(entryEvent.getKey(), true);
+              }
+
+            }
+          }
+        }
+      }

Review comment:
       In the future, you might want to try extracting some of those 
inner-loops or inner-if-blocks to their own methods. How you name those methods 
can then really improve understanding. It's also too easy to get lost in those 
deeply nested structures like this.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NullPointerException during PutAll with CQ LOCAL_DESTROY event
> --------------------------------------------------------------
>
>                 Key: GEODE-9016
>                 URL: https://issues.apache.org/jira/browse/GEODE-9016
>             Project: Geode
>          Issue Type: Bug
>            Reporter: Jianxia Chen
>            Assignee: Jianxia Chen
>            Priority: Major
>              Labels: pull-request-available
>
> It is possible that PutAll operation hits a NPE when CQ LOCAL_DESTROY event 
> is generated.
> {code:java}
> java.lang.NullPointerException
>         at 
> java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
>         at 
> org.apache.geode.cache.query.cq.internal.ServerCQResultsCachePartitionRegionImpl.remove(ServerCQResultsCachePartitionRegionImpl.java:69)
>         at 
> org.apache.geode.cache.query.cq.internal.ServerCQImpl.removeFromCqResultKeys(ServerCQImpl.java:297)
>         at 
> org.apache.geode.internal.cache.DistributedCacheOperation.removeDestroyTokensFromCqResultKeys(DistributedCacheOperation.java:743)
>         at 
> org.apache.geode.internal.cache.DistributedCacheOperation._distribute(DistributedCacheOperation.java:693)
>         at 
> org.apache.geode.internal.cache.DistributedCacheOperation.startOperation(DistributedCacheOperation.java:277)
>         at 
> org.apache.geode.internal.cache.DistributedRegion.postPutAllSend(DistributedRegion.java:3304)
>         at 
> org.apache.geode.internal.cache.LocalRegionDataView.postPutAll(LocalRegionDataView.java:358)
>         at 
> org.apache.geode.internal.cache.partitioned.PutAllPRMessage.doPostPutAll(PutAllPRMessage.java:568)
>         at 
> org.apache.geode.internal.cache.partitioned.PutAllPRMessage.doLocalPutAll(PutAllPRMessage.java:507)
>         at 
> org.apache.geode.internal.cache.partitioned.PutAllPRMessage.operateOnPartitionedRegion(PutAllPRMessage.java:326)
>         at 
> org.apache.geode.internal.cache.partitioned.PartitionMessage.process(PartitionMessage.java:333)
>         at 
> org.apache.geode.distributed.internal.DistributionMessage.scheduleAction(DistributionMessage.java:376)
>         at 
> org.apache.geode.distributed.internal.DistributionMessage$1.run(DistributionMessage.java:440)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at 
> org.apache.geode.distributed.internal.ClusterOperationExecutors.runUntilShutdown(ClusterOperationExecutors.java:442)
>         at 
> org.apache.geode.distributed.internal.ClusterOperationExecutors.doPartitionRegionThread(ClusterOperationExecutors.java:422)
>         at 
> org.apache.geode.logging.internal.executors.LoggingThreadFactory.lambda$newThread$0(LoggingThreadFactory.java:119)
>         at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to