This is an automated email from the ASF dual-hosted git repository.
alberto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 6cb76751a7 GEODE-10420: Finish distribute() work if interrupted (#7854)
6cb76751a7 is described below
commit 6cb76751a78d2dab90d5bf8ab8eda881f89717b0
Author: Alberto Gomez <[email protected]>
AuthorDate: Mon Sep 12 15:12:14 2022 +0200
GEODE-10420: Finish distribute() work if interrupted (#7854)
It is possible that an event of which a gateway sender
is to be notified is lost if during the process the thread
is interrupted.
The reason is that the distribute() method
in the AbstractGatewaySender when it catches the
InterruptedException at some point, just returns, but
does not put the event in the queue and neither
drops it.
The fix consists of handling the event correctly
(put it in the queue or drop it) if the InterruptedException
is caught but when the method returns set again
the interrupt flag so that the caller is aware.
---
.../geode/internal/cache/EntryEventImpl.java | 5 +-
.../internal/cache/wan/AbstractGatewaySender.java | 20 ++-
.../cache/wan/AbstractGatewaySenderTest.java | 176 +++++++++++++++++++++
3 files changed, 192 insertions(+), 9 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index 16adbeca7a..6a521becbe 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -340,8 +340,9 @@ public class EntryEventImpl implements InternalEntryEvent,
InternalCacheEvent,
op = other.op;
distributedMember = other.distributedMember;
filterInfo = other.filterInfo;
- keyInfo = other.keyInfo.isDistKeyInfo() ? new
DistTxKeyInfo((DistTxKeyInfo) other.keyInfo)
- : new KeyInfo(other.keyInfo);
+ keyInfo =
+ other.getKeyInfo().isDistKeyInfo() ? new DistTxKeyInfo((DistTxKeyInfo)
other.getKeyInfo())
+ : new KeyInfo(other.getKeyInfo());
if (other.getRawCallbackArgument() instanceof
GatewaySenderEventCallbackArgument) {
keyInfo.setCallbackArg((new GatewaySenderEventCallbackArgument(
(GatewaySenderEventCallbackArgument)
other.getRawCallbackArgument())));
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 09dc843f3a..f96cdfe415 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -1042,6 +1042,7 @@ public abstract class AbstractGatewaySender implements
InternalGatewaySender, Di
List<Integer> allRemoteDSIds, boolean isLastEventInTransaction) {
final boolean isDebugEnabled = logger.isDebugEnabled();
+ boolean wasInterrupted = false;
// released by this method or transfers ownership to TmpQueueEvent
@Released
@@ -1156,15 +1157,17 @@ public abstract class AbstractGatewaySender implements
InternalGatewaySender, Di
}
}
if (enqueuedAllTempQueueEvents) {
- try {
- while (!getLifeCycleLock().readLock().tryLock(10,
TimeUnit.MILLISECONDS)) {
- if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled,
clonedEvent)) {
- return;
+ while (true) {
+ try {
+ while (!getLifeCycleLock().readLock().tryLock(10,
TimeUnit.MILLISECONDS)) {
+ if (!getIsRunningAndDropEventIfNotRunning(event,
isDebugEnabled, clonedEvent)) {
+ return;
+ }
}
+ break;
+ } catch (InterruptedException e) {
+ wasInterrupted = true;
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
}
}
}
@@ -1213,6 +1216,9 @@ public abstract class AbstractGatewaySender implements
InternalGatewaySender, Di
if (freeClonedEvent) {
clonedEvent.release(); // fix for bug 48035
}
+ if (wasInterrupted) {
+ Thread.currentThread().interrupt();
+ }
}
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java
index aac5f0d3c0..193a120458 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java
@@ -18,15 +18,28 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import org.junit.Test;
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.distributed.internal.DistributionAdvisor;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.KeyInfo;
import org.apache.geode.internal.cache.RegionQueue;
public class AbstractGatewaySenderTest {
@@ -58,4 +71,167 @@ public class AbstractGatewaySenderTest {
assertThat(event).isSameAs(gatewaySenderEvent);
}
+
+ @Test
+ public void distributeFinishesWorkWhenInterrupted() throws
InterruptedException {
+ DummyGatewaySenderEventProcessor processor = new
DummyGatewaySenderEventProcessor();
+ TestableGatewaySender gatewaySender = new TestableGatewaySender(processor);
+ EnumListenerEvent operationType = EnumListenerEvent.AFTER_CREATE;
+ EntryEventImpl event = mock(EntryEventImpl.class);
+ when(event.getKeyInfo()).thenReturn(mock(KeyInfo.class));
+ Operation operation = mock(Operation.class);
+ when(operation.isLocal()).thenReturn(false);
+ when(operation.isExpiration()).thenReturn(false);
+ when(event.getOperation()).thenReturn(operation);
+ InternalRegion region = mock(InternalRegion.class);
+ when(region.getDataPolicy()).thenReturn(DataPolicy.PARTITION);
+ when(event.getRegion()).thenReturn(region);
+ List<Integer> allRemoteDSIds = Collections.singletonList(1);
+
+ CountDownLatch lockAcquiredLatch = new CountDownLatch(1);
+ CountDownLatch unlockLatch = new CountDownLatch(1);
+
+ // Get lifeCycleLock in write mode in new thread so that
+ // the thread calling distribute will not be able
+ // to acquire it
+ Thread thread = new Thread(() -> {
+ gatewaySender.getLifeCycleLock().writeLock().lock();
+ lockAcquiredLatch.countDown();
+ try {
+ unlockLatch.await();
+ } catch (InterruptedException ignore) {
+ }
+ gatewaySender.getLifeCycleLock().writeLock().unlock();
+ });
+ thread.start();
+ lockAcquiredLatch.await();
+
+ // Send interrupted and then call distribute
+ Thread.currentThread().interrupt();
+ gatewaySender.distribute(operationType, event, allRemoteDSIds, true);
+
+ unlockLatch.countDown();
+
+ // Check that the interrupted exception has been reset
+ assertThat(Thread.currentThread().isInterrupted()).isTrue();
+ // Check that the work was finished even if the interrupt signal was set
+
assertThat(processor.getTimesRegisterEventDroppedInPrimaryQueueCalled()).isEqualTo(1);
+ }
+
+ public static class TestableGatewaySender extends AbstractGatewaySender {
+ private int isRunningTimesCalled = 0;
+
+ public TestableGatewaySender(AbstractGatewaySenderEventProcessor
eventProcessor) {
+ this.eventProcessor = eventProcessor;
+ enqueuedAllTempQueueEvents = true;
+ }
+
+ @Override
+ public void fillInProfile(DistributionAdvisor.Profile profile) {}
+
+ @Override
+ public void start() {}
+
+ @Override
+ public boolean isPrimary() {
+ return true;
+ }
+
+ @Override
+ public void startWithCleanQueue() {}
+
+ @Override
+ public void prepareForStop() {}
+
+ @Override
+ public void stop() {}
+
+ @Override
+ public void setModifiedEventId(EntryEventImpl clonedEvent) {}
+
+ @Override
+ public GatewaySenderStats getStatistics() {
+ return mock(GatewaySenderStats.class);
+ }
+
+ @Override
+ public GatewaySenderAdvisor getSenderAdvisor() {
+ return mock(GatewaySenderAdvisor.class);
+ }
+
+ @Override
+ public boolean isRunning() {
+ if (isRunningTimesCalled++ == 0) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String getId() {
+ return "test";
+ }
+ }
+
+ public static class DummyGatewaySenderEventProcessor extends
AbstractGatewaySenderEventProcessor {
+
+ private int timesEnqueueEventCalled = 0;
+ private int timesRegisterEventDroppedInPrimaryQueueCalled = 0;
+
+ public DummyGatewaySenderEventProcessor() {
+ super("", new DummyGatewaySender(), null);
+ }
+
+ @Override
+ public void enqueueEvent(EnumListenerEvent operation, EntryEvent event,
Object substituteValue,
+ boolean isLastEventInTransaction) throws IOException, CacheException {
+ timesEnqueueEventCalled++;
+ }
+
+ public int getTimesEnqueueEventCalled() {
+ return timesEnqueueEventCalled;
+ }
+
+ @Override
+ protected void initializeMessageQueue(String id, boolean cleanQueues) {}
+
+ @Override
+ protected void rebalance() {}
+
+ public int getTimesRegisterEventDroppedInPrimaryQueueCalled() {
+ return timesRegisterEventDroppedInPrimaryQueueCalled;
+ }
+
+ @Override
+ protected void registerEventDroppedInPrimaryQueue(EntryEventImpl
droppedEvent) {
+ timesRegisterEventDroppedInPrimaryQueueCalled++;
+ }
+
+ @Override
+ public void initializeEventDispatcher() {}
+
+ @Override
+ protected void enqueueEvent(GatewayQueueEvent event) {}
+ }
+
+ public static class DummyGatewaySender extends AbstractGatewaySender {
+ @Override
+ public void fillInProfile(DistributionAdvisor.Profile profile) {}
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void startWithCleanQueue() {}
+
+ @Override
+ public void prepareForStop() {}
+
+ @Override
+ public void stop() {}
+
+ @Override
+ public void setModifiedEventId(EntryEventImpl clonedEvent) {}
+
+ }
}