Several AsyncEventListener implementations in our test code are not thread-safe and this is causing tests that use them to be flaky. I discovered this while trying to fix the flakiness in some disabled tests in AsyncEventListenerDUnitTest.
The issue here is that AsyncEventListener.processEvents can be invoked by Geode with multiple threads concurrently. So, any implementation should either not use state in the AsyncEventListener implementation or it should do so in a thread-safe way. The following are AsyncEventListener implementations in the test source: 1) org.apache.geode.internal.cache.wan.MyAsyncEventListener2 (this examples fixes existing flakiness in several tests) 25 public class MyAsyncEventListener2 implements AsyncEventListener { 26 27 private Map<Integer, List<GatewaySenderEventImpl>> bucketToEventsMap; 28 29 public MyAsyncEventListener2() { 30 this.bucketToEventsMap = new HashMap<Integer, List<GatewaySenderEventImpl>>(); 31 } 32 33 public boolean processEvents(List<AsyncEvent> events) { 34 for (AsyncEvent event : events) { 35 GatewaySenderEventImpl gatewayEvent = (GatewaySenderEventImpl) event; 36 int bucketId = gatewayEvent.getBucketId(); 37 List<GatewaySenderEventImpl> bucketEvents = this.bucketToEventsMap.get(bucketId); 38 if (bucketEvents == null) { 39 bucketEvents = new ArrayList<GatewaySenderEventImpl>(); 40 bucketEvents.add(gatewayEvent); 41 this.bucketToEventsMap.put(bucketId, bucketEvents); 42 } else { 43 bucketEvents.add(gatewayEvent); 44 } 45 } 46 return true; 47 } Multiple threads can invoke processEvents concurrently. Try enabling AsyncEventListenerDUnitTest.testParallelAsyncEventQueueHA_Scenario1 by removing the @Ignore annotation and then run until failure in IntelliJ. You'll see it fail. Now edit AsyncEventQueueTestBase.verifyAsyncEventListenerForPossibleDuplicates: @@ -20,6 +20,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; import static org.apache.geode.distributed.ConfigurationProperties.OFF_HEAP_MEMORY_SIZE; import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS; import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR; *+import static org.assertj.core.api.Assertions.assertThat;* import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -1343,6 +1344,7 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase { LogWriterUtils.getLogWriter() .info("Events for bucket: " + bucketId + " is " + eventsForBucket); assertNotNull(eventsForBucket); *+ assertThat(eventsForBucket.size()).as("bucketToEventsMap: " + bucketToEventsMap).isEqualTo(batchSize);* for (int i = 0; i < batchSize; i++) { GatewaySenderEventImpl senderEvent = eventsForBucket.get(i); assertTrue(senderEvent.getPossibleDuplicate()); Now if you run until failure, you'll see that the contents of bucketToEventsMap is a mess, including duplicate keys. The reason is that two threads executing MyAsyncEventListener2.processEvents concurrently both see bucketEvents == null on line 38 so they both perform their own put on line 41. Changing MyAsyncEventListener2's bucketToEventsMap from HashMap to ConcurrentHashMap isn't the best solution because multiple lines are accessing and mutating bucketToEventsMap in the method (ie non-atomically). In this case, the better fix is to mark the processEvents as synchronized. 2) org.apache.geode.internal.cache.wan.asyncqueue.AbstractMovingAsyncEventListener (this example helps prevent future flakiness with defensive coding by proper encapsulation) This one doesn't cause any flakiness *yet* but because the member fields are non-private, it's very easy for a developer to write a new flaky test simply by modifying the values of these member fields from a sub-class or from any test within the same package. *28 public abstract class AbstractMovingAsyncEventListener implements AsyncEventListener {* * 29 protected final DistributedMember destination;* * 30 boolean moved;* * 31 Set<Object> keysSeen = new HashSet<Object>();* * 32 * * 33 public AbstractMovingAsyncEventListener(final DistributedMember destination) {* * 34 this.destination = destination;* * 35 }* * 36 * * 37 @Override* * 38 public boolean processEvents(final List<AsyncEvent> events) {* * 39 if (!moved) {* * 40 * * 41 AsyncEvent event1 = events.get(0);* * 42 move(event1);* * 43 moved = true;* * 44 return false;* * 45 }* * 46 * * 47 events.stream().map(AsyncEvent::getKey).forEach(keysSeen::add);* * 48 return true;* * 49 }* Imagine thread-1 is reading moved on line 39 while thread-2 is modifying moved on line 43. Now imagine that thread-3 is modifying moved directly from a test or sub-class: *listener.moved = false;* This creates a mess that more defensive coding could prevent (ie by making the member fields private and thus more encapsulated). Alternatively, if we need to have a sub-class reset moved, then we would probably need to make processEvents synchronized. It would be safest to still make moved private and then keep all access and mutation of moved local within AbstractMovingAsyncEventListener so that the concurrency (or lack of it) is fully visible when looking at that class. Having to look for how sub-classes manipulate a non-private field slows the developer down and risks overlooking something that's unexpected. The two subclasses of AbstractMovingAsyncEventListener in AsyncEventListenerDUnitTest do NOT currently access or mutate any of the member fields directly, so there's no reason they shouldn't just be private.