[ https://issues.apache.org/jira/browse/GEODE-7971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17103022#comment-17103022 ]
ASF GitHub Bot commented on GEODE-7971: --------------------------------------- DonalEvans commented on a change in pull request #4928: URL: https://github.com/apache/geode/pull/4928#discussion_r422349973 ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/TXLastEventInTransactionUtils.java ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.Collection; +import java.util.List; +import java.util.ServiceConfigurationError; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.wan.GatewaySender; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class TXLastEventInTransactionUtils { + private static final Logger logger = LogService.getLogger(); + + /** + * @param callbacks list of events belonging to a transaction + * + * @return the last event of the transaction. + * If the regions to which the events belong do not have senders + * that group transactions it returns null. + * If the regions to which the + * events belong have different sets of senders that group transactions + * then it throws a ServiceConfigurationError exception. + */ + public static EntryEventImpl getLastTransactionEvent(List<EntryEventImpl> callbacks, + Cache cache) + throws ServiceConfigurationError { + if (checkNoSendersGroupTransactionEvents(callbacks, cache)) { + return null; + } + + List<Set> senderIdsPerEvent = getGroupingSendersPerEvent(callbacks, cache); + if (senderIdsPerEvent.stream().distinct().count() > 1) { + String info = eventsAndSendersPerEventToString(callbacks, senderIdsPerEvent); + throw new ServiceConfigurationError( + "Not all events go to the same senders that group transactions. " + info); + } ; Review comment: Unnecessary semicolon here. ########## File path: geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java ########## @@ -186,6 +188,109 @@ public void isDREventReturnsFalseForPartitionedRegionEvent() { assertThat(queue.isDREvent(cache, event)).isFalse(); } + @Test + public void peekedExtraEventsWhenIsGroupTransactionEvents() + throws Exception { + + GatewaySenderEventImpl event1 = createGatewaySenderEventImpl(1, false); + GatewaySenderEventImpl event2 = createGatewaySenderEventImpl(2, false); + GatewaySenderEventImpl event3 = createGatewaySenderEventImpl(1, true); + GatewaySenderEventImpl event4 = createGatewaySenderEventImpl(2, true); + GatewaySenderEventImpl event5 = createGatewaySenderEventImpl(3, false); + GatewaySenderEventImpl event6 = createGatewaySenderEventImpl(3, true); + + Queue backingList = new LinkedList(); + backingList.add(event1); + backingList.add(event2); + backingList.add(event3); + backingList.add(event4); + backingList.add(event5); + backingList.add(event6); + + BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList); + + TestableParallelGatewaySenderQueue queue = new TestableParallelGatewaySenderQueue(sender, + Collections.emptySet(), 0, 1, metaRegionFactory); + queue.setGroupTransactionEvents(true); + queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue); + + List peeked = queue.peek(3, 1000); + assertEquals(4, peeked.size()); + List peekedAfter = queue.peek(3, 1000); + assertEquals(2, peekedAfter.size()); + } + + @Test + public void peekedExtraEventsWhenIsGroupTransactionEventsAndTimeout() Review comment: This test name could be more descriptive. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Gateway sender to deliver transaction events atomically to gateway receivers > ---------------------------------------------------------------------------- > > Key: GEODE-7971 > URL: https://issues.apache.org/jira/browse/GEODE-7971 > Project: Geode > Issue Type: Improvement > Components: wan > Reporter: Alberto Gomez > Assignee: Alberto Gomez > Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > The goal of this ticket is to implement the necessary changes in the gateway > sender to prevent that events belonging to the same transaction are spread > across different batches. In other words, to ensure that events from the same > transaction are sent inside the same batch. > This will be an optional feature on gateway senders to be enabled via a new > parameter (--group-transaction-events) and will be restricted to serial > gateway senders with just one dispatcher thread or to parallel gateway > senders. > Apart from the above restriction, grouping of events for a transaction inside > the same batch may only be attained if the regions to which the events belong > are replicated by the same set of gateway senders with the > --group-transaction-events flag enabled. If this condition is not met, the > events will be correctly delivered by the gateway senders but it will not be > guaranteed that all events will always be sent inside the same batch. > For more details see: > [https://cwiki.apache.org/confluence/display/GEODE/Gw+sender+to+deliver+transaction+events+atomically+to+receivers] > -- This message was sent by Atlassian Jira (v8.3.4#803005)