[ https://issues.apache.org/jira/browse/GEODE-8202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17260792#comment-17260792 ]
ASF GitHub Bot commented on GEODE-8202: --------------------------------------- boglesby commented on a change in pull request #5600: URL: https://github.com/apache/geode/pull/5600#discussion_r553566356 ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java ########## @@ -146,6 +146,10 @@ */ private int batchSize; + private String expectedReceiverUniqueId = ""; Review comment: I think the expectedReceiverUniqueId should be on AbstractGatewaySender (like serverLocation). ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java ########## @@ -146,6 +146,10 @@ */ private int batchSize; + private String expectedReceiverUniqueId = ""; + + private boolean enforceThreadsConnectSameReceiver = false; + Review comment: AbstractGatewaySenderEventProcessor defines the enforceThreadsConnectSameReceiver but it doesn't need to since AbstractGatewaySender already defines it and the processor has a reference to the sender. Removing this attribute will simplify some of this code. The changes to RemoteConcurrentSerialGatewaySenderEventProcessor and SerialGatewaySenderImpl would be eliminated. ConcurrentSerialGatewaySenderEventProcessor can be changed to reference the value in the sender (sender.getEnforceThreadsConnectSameReceiver()). Here is a diff with those changes: ``` diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 1fc160ebb0..3a20e3020b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -148,8 +148,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread private String expectedReceiverUniqueId = ""; - private boolean enforceThreadsConnectSameReceiver = false; - public AbstractGatewaySenderEventProcessor(String string, GatewaySender sender, ThreadsMonitoring tMonitoring) { super(string); @@ -158,13 +156,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread this.threadMonitoring = tMonitoring; } - public AbstractGatewaySenderEventProcessor(String string, - GatewaySender sender, ThreadsMonitoring tMonitoring, - boolean enforceThreadsConnectSameReceiver) { - this(string, sender, tMonitoring); - this.enforceThreadsConnectSameReceiver = enforceThreadsConnectSameReceiver; - } - public void setExpectedReceiverUniqueId(String uniqueId) { this.expectedReceiverUniqueId = uniqueId; } @@ -173,14 +164,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread return this.expectedReceiverUniqueId; } - public void setEnforceThreadsConnectSameReceiver(boolean value) { - this.enforceThreadsConnectSameReceiver = value; - } - - public boolean getEnforceThreadsConnectSameReceiver() { - return this.enforceThreadsConnectSameReceiver; - } - public Object getRunningStateLock() { return runningStateLock; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java index 9cf7487a40..7adf99640f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java @@ -77,20 +77,6 @@ public class ConcurrentSerialGatewaySenderEventProcessor } } - public ConcurrentSerialGatewaySenderEventProcessor(AbstractGatewaySender sender, - ThreadsMonitoring tMonitoring, boolean cleanQueues, - boolean enforceThreadsConnectSameReceiver) { - super("Event Processor for GatewaySender_" + sender.getId(), sender, tMonitoring, - enforceThreadsConnectSameReceiver); - this.sender = sender; - - initializeMessageQueue(sender.getId(), cleanQueues); - queues = new HashSet<RegionQueue>(); - for (SerialGatewaySenderEventProcessor processor : processors) { - queues.add(processor.getQueue()); - } - } - @Override public int getTotalQueueSize() { int totalSize = 0; @@ -194,7 +180,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor @Override public void run() { boolean isDebugEnabled = logger.isDebugEnabled(); - if (getEnforceThreadsConnectSameReceiver()) { + if (this.sender.getEnforceThreadsConnectSameReceiver()) { this.processors.get(0).start(); waitForRunningStatus(this.processors.get(0)); String receiverUniqueId = this.processors.get(0).getExpectedReceiverUniqueId(); @@ -206,7 +192,8 @@ public class ConcurrentSerialGatewaySenderEventProcessor } } - for (int i = getEnforceThreadsConnectSameReceiver() ? 1 : 0; i < this.processors.size(); i++) { + for (int i = this.sender.getEnforceThreadsConnectSameReceiver() ? 1 : 0; i < this.processors + .size(); i++) { if (isDebugEnabled) { logger.debug("Starting the serialProcessor {}", i); } diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java index 306a2a4937..7139307d7f 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java @@ -30,20 +30,11 @@ public class RemoteConcurrentSerialGatewaySenderEventProcessor super(sender, tMonitoring, cleanQueues); } - public RemoteConcurrentSerialGatewaySenderEventProcessor(AbstractGatewaySender sender, - ThreadsMonitoring tMonitoring, boolean cleanQueues, - boolean enforceThreadsConnectSameReceiver) { - super(sender, tMonitoring, cleanQueues, enforceThreadsConnectSameReceiver); - } - @Override protected void initializeMessageQueue(String id, boolean cleanQueues) { for (int i = 0; i < sender.getDispatcherThreads(); i++) { - SerialGatewaySenderEventProcessor processor = - new RemoteSerialGatewaySenderEventProcessor(this.sender, id + "." + i, - getThreadMonitorObj(), cleanQueues); - processor.setEnforceThreadsConnectSameReceiver(getEnforceThreadsConnectSameReceiver()); - processors.add(processor); + processors.add(new RemoteSerialGatewaySenderEventProcessor(this.sender, id + "." + i, + getThreadMonitorObj(), cleanQueues)); if (logger.isDebugEnabled()) { logger.debug("Created the RemoteSerialGatewayEventProcessor_{}->{}", i, processors.get(i)); } diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java index 7c93836667..3474b4a3c5 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java @@ -120,9 +120,7 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender { AbstractGatewaySenderEventProcessor eventProcessor; if (getDispatcherThreads() > 1) { eventProcessor = new RemoteConcurrentSerialGatewaySenderEventProcessor( - SerialGatewaySenderImpl.this, getThreadMonitorObj(), cleanQueues, - enforceThreadsConnectSameReceiver); - // eventProcessor.setEnforceThreadsConnectSameReceiver(enforceThreadsConnectSameReceiver); + SerialGatewaySenderImpl.this, getThreadMonitorObj(), cleanQueues); } else { eventProcessor = new RemoteSerialGatewaySenderEventProcessor(SerialGatewaySenderImpl.this, getId(), getThreadMonitorObj(), cleanQueues); ``` ########## File path: geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java ########## @@ -362,11 +366,71 @@ public void destroyConnection() { } } + Connection retryInitializeConnection(Connection con) { + final boolean isDebugEnabled = logger.isDebugEnabled(); + ServerLocation server = this.sender.getServerLocation(); + String connectedServerId = con.getEndpoint().getMemberId().getUniqueId(); + String expectedServerId = this.processor.getExpectedReceiverUniqueId(); + + if (expectedServerId.equals("")) { + if (isDebugEnabled) { + logger.debug("First dispatcher connected to server " + connectedServerId); + } + this.processor.setExpectedReceiverUniqueId(connectedServerId); + return con; + } + + int attempt = 0; + final int attemptsPerServer = 5; Review comment: Should attemptsPerServer be configurable? ########## File path: geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java ########## @@ -362,11 +366,71 @@ public void destroyConnection() { } } + Connection retryInitializeConnection(Connection con) { + final boolean isDebugEnabled = logger.isDebugEnabled(); + ServerLocation server = this.sender.getServerLocation(); Review comment: The server variable is unused ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > New option for serial gw sender threads start when receivers share ip and port > ------------------------------------------------------------------------------ > > Key: GEODE-8202 > URL: https://issues.apache.org/jira/browse/GEODE-8202 > Project: Geode > Issue Type: Improvement > Reporter: Alberto Bustamante Reyes > Assignee: Alberto Bustamante Reyes > Priority: Major > Labels: pull-request-available > > RFC: > [https://cwiki.apache.org/confluence/display/GEODE/New+option+for+serial+gw+sender+dispatcher+threads+start|https://cwiki.apache.org/confluence/display/GEODE/New+option+for+serial+gw+sender+dispatcher+threads+start] -- This message was sent by Atlassian Jira (v8.3.4#803005)