[ https://issues.apache.org/jira/browse/GEODE-5922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17258595#comment-17258595 ]
ASF GitHub Bot commented on GEODE-5922: --------------------------------------- echobravopapa commented on a change in pull request #5870: URL: https://github.com/apache/geode/pull/5870#discussion_r551647701 ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java ########## @@ -255,23 +247,18 @@ public void destroy() { } @Override - public boolean put(Object event) throws CacheException { - lock.writeLock().lock(); - try { - GatewaySenderEventImpl eventImpl = (GatewaySenderEventImpl) event; - final Region r = eventImpl.getRegion(); - final boolean isPDXRegion = - (r instanceof DistributedRegion && r.getName().equals(PeerTypeRegistration.REGION_NAME)); - final boolean isWbcl = - this.regionName.startsWith(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX); - if (!(isPDXRegion && isWbcl)) { - putAndGetKey(event); - return true; - } - return false; - } finally { - lock.writeLock().unlock(); + public synchronized boolean put(Object event) throws CacheException { Review comment: Do we having testing around this now `synchronized` method? ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java ########## @@ -316,71 +304,66 @@ public AsyncEvent take() throws CacheException { * have peeked. If the entry was not peeked, this method will silently return. */ @Override - public void remove() throws CacheException { - lock.writeLock().lock(); + public synchronized void remove() throws CacheException { + if (peekedIds.isEmpty()) { + return; + } + Long key = peekedIds.remove(); + boolean isExtraPeeked = extraPeekedIds.remove(key); try { - if (peekedIds.isEmpty()) { - return; + // Increment the head key + if (!isExtraPeeked) { + updateHeadKey(key.longValue()); } - Long key = peekedIds.remove(); - boolean isExtraPeeked = extraPeekedIds.remove(key); - try { - // Increment the head key - if (!isExtraPeeked) { - updateHeadKey(key.longValue()); - } - removeIndex(key); - // Remove the entry at that key with a callback arg signifying it is - // a WAN queue so that AbstractRegionEntry.destroy can get the value - // even if it has been evicted to disk. In the normal case, the - // AbstractRegionEntry.destroy only gets the value in the VM. - this.region.localDestroy(key, WAN_QUEUE_TOKEN); - this.stats.decQueueSize(); - - } catch (EntryNotFoundException ok) { - // this is acceptable because the conflation can remove entries - // out from underneath us. - if (logger.isDebugEnabled()) { - logger.debug( - "{}: Did not destroy entry at {} it was not there. It should have been removed by conflation.", - this, key); - } + removeIndex(key); + // Remove the entry at that key with a callback arg signifying it is + // a WAN queue so that AbstractRegionEntry.destroy can get the value + // even if it has been evicted to disk. In the normal case, the + // AbstractRegionEntry.destroy only gets the value in the VM. + this.region.localDestroy(key, WAN_QUEUE_TOKEN); + this.stats.decQueueSize(); + + } catch (EntryNotFoundException ok) { + // this is acceptable because the conflation can remove entries + // out from underneath us. + if (logger.isDebugEnabled()) { + logger.debug( + "{}: Did not destroy entry at {} it was not there. It should have been removed by conflation.", + this, key); } + } - boolean wasEmpty = this.lastDispatchedKey == this.lastDestroyedKey; - if (!isExtraPeeked) { + boolean wasEmpty = this.lastDispatchedKey == this.lastDestroyedKey; + if (!isExtraPeeked) { + this.lastDispatchedKey = key; Review comment: I was going to ask why this was re-ordered, but its just the diff making it look that way... ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > SerialGatewaySenderQueue concurrency is poorly implemented > ---------------------------------------------------------- > > Key: GEODE-5922 > URL: https://issues.apache.org/jira/browse/GEODE-5922 > Project: Geode > Issue Type: Improvement > Components: wan > Reporter: Bruce J Schuchardt > Assignee: Bruce J Schuchardt > Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 20m > Remaining Estimate: 0h > > This class uses synchronization on the queue to limit access to one put at a > time. Synchronization isn't a fair locking mechanism so threads can be > blocked trying to add events to the queue while other more recent events get > the lock and insert their events. This causes inconsistent latency which > I've observed being as long as 30 seconds, causing client connections to be > shut down by the ClientHealthMonitor. -- This message was sent by Atlassian Jira (v8.3.4#803005)