[ 
https://issues.apache.org/jira/browse/GEODE-8202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17260792#comment-17260792
 ] 

ASF GitHub Bot commented on GEODE-8202:
---------------------------------------

boglesby commented on a change in pull request #5600:
URL: https://github.com/apache/geode/pull/5600#discussion_r553566356



##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
##########
@@ -146,6 +146,10 @@
    */
   private int batchSize;
 
+  private String expectedReceiverUniqueId = "";

Review comment:
       I think the expectedReceiverUniqueId should be on AbstractGatewaySender 
(like serverLocation).
   

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
##########
@@ -146,6 +146,10 @@
    */
   private int batchSize;
 
+  private String expectedReceiverUniqueId = "";
+
+  private boolean enforceThreadsConnectSameReceiver = false;
+

Review comment:
       AbstractGatewaySenderEventProcessor defines the 
enforceThreadsConnectSameReceiver but it doesn't need to since 
AbstractGatewaySender already defines it and the processor has a reference to 
the sender. Removing this attribute will simplify some of this code.
   
   The changes to RemoteConcurrentSerialGatewaySenderEventProcessor and 
SerialGatewaySenderImpl would be eliminated.
   
   ConcurrentSerialGatewaySenderEventProcessor can be changed to reference the 
value in the sender (sender.getEnforceThreadsConnectSameReceiver()).
   
   Here is a diff with those changes:
   ```
   diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
   index 1fc160ebb0..3a20e3020b 100644
   --- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
   +++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
   @@ -148,8 +148,6 @@ public abstract class 
AbstractGatewaySenderEventProcessor extends LoggingThread
    
      private String expectedReceiverUniqueId = "";
    
   -  private boolean enforceThreadsConnectSameReceiver = false;
   -
      public AbstractGatewaySenderEventProcessor(String string,
          GatewaySender sender, ThreadsMonitoring tMonitoring) {
        super(string);
   @@ -158,13 +156,6 @@ public abstract class 
AbstractGatewaySenderEventProcessor extends LoggingThread
        this.threadMonitoring = tMonitoring;
      }
    
   -  public AbstractGatewaySenderEventProcessor(String string,
   -      GatewaySender sender, ThreadsMonitoring tMonitoring,
   -      boolean enforceThreadsConnectSameReceiver) {
   -    this(string, sender, tMonitoring);
   -    this.enforceThreadsConnectSameReceiver = 
enforceThreadsConnectSameReceiver;
   -  }
   -
      public void setExpectedReceiverUniqueId(String uniqueId) {
        this.expectedReceiverUniqueId = uniqueId;
      }
   @@ -173,14 +164,6 @@ public abstract class 
AbstractGatewaySenderEventProcessor extends LoggingThread
        return this.expectedReceiverUniqueId;
      }
    
   -  public void setEnforceThreadsConnectSameReceiver(boolean value) {
   -    this.enforceThreadsConnectSameReceiver = value;
   -  }
   -
   -  public boolean getEnforceThreadsConnectSameReceiver() {
   -    return this.enforceThreadsConnectSameReceiver;
   -  }
   -
      public Object getRunningStateLock() {
        return runningStateLock;
      }
   diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
   index 9cf7487a40..7adf99640f 100644
   --- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
   +++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
   @@ -77,20 +77,6 @@ public class ConcurrentSerialGatewaySenderEventProcessor
        }
      }
    
   -  public ConcurrentSerialGatewaySenderEventProcessor(AbstractGatewaySender 
sender,
   -      ThreadsMonitoring tMonitoring, boolean cleanQueues,
   -      boolean enforceThreadsConnectSameReceiver) {
   -    super("Event Processor for GatewaySender_" + sender.getId(), sender, 
tMonitoring,
   -        enforceThreadsConnectSameReceiver);
   -    this.sender = sender;
   -
   -    initializeMessageQueue(sender.getId(), cleanQueues);
   -    queues = new HashSet<RegionQueue>();
   -    for (SerialGatewaySenderEventProcessor processor : processors) {
   -      queues.add(processor.getQueue());
   -    }
   -  }
   -
      @Override
      public int getTotalQueueSize() {
        int totalSize = 0;
   @@ -194,7 +180,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor
      @Override
      public void run() {
        boolean isDebugEnabled = logger.isDebugEnabled();
   -    if (getEnforceThreadsConnectSameReceiver()) {
   +    if (this.sender.getEnforceThreadsConnectSameReceiver()) {
          this.processors.get(0).start();
          waitForRunningStatus(this.processors.get(0));
          String receiverUniqueId = 
this.processors.get(0).getExpectedReceiverUniqueId();
   @@ -206,7 +192,8 @@ public class ConcurrentSerialGatewaySenderEventProcessor
          }
        }
    
   -    for (int i = getEnforceThreadsConnectSameReceiver() ? 1 : 0; i < 
this.processors.size(); i++) {
   +    for (int i = this.sender.getEnforceThreadsConnectSameReceiver() ? 1 : 
0; i < this.processors
   +        .size(); i++) {
          if (isDebugEnabled) {
            logger.debug("Starting the serialProcessor {}", i);
          }
   diff --git 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
   index 306a2a4937..7139307d7f 100644
   --- 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
   +++ 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
   @@ -30,20 +30,11 @@ public class 
RemoteConcurrentSerialGatewaySenderEventProcessor
        super(sender, tMonitoring, cleanQueues);
      }
    
   -  public 
RemoteConcurrentSerialGatewaySenderEventProcessor(AbstractGatewaySender sender,
   -      ThreadsMonitoring tMonitoring, boolean cleanQueues,
   -      boolean enforceThreadsConnectSameReceiver) {
   -    super(sender, tMonitoring, cleanQueues, 
enforceThreadsConnectSameReceiver);
   -  }
   -
      @Override
      protected void initializeMessageQueue(String id, boolean cleanQueues) {
        for (int i = 0; i < sender.getDispatcherThreads(); i++) {
   -      SerialGatewaySenderEventProcessor processor =
   -          new RemoteSerialGatewaySenderEventProcessor(this.sender, id + "." 
+ i,
   -              getThreadMonitorObj(), cleanQueues);
   -      
processor.setEnforceThreadsConnectSameReceiver(getEnforceThreadsConnectSameReceiver());
   -      processors.add(processor);
   +      processors.add(new 
RemoteSerialGatewaySenderEventProcessor(this.sender, id + "." + i,
   +          getThreadMonitorObj(), cleanQueues));
          if (logger.isDebugEnabled()) {
            logger.debug("Created the 
RemoteSerialGatewayEventProcessor_{}->{}", i, processors.get(i));
          }
   diff --git 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
   index 7c93836667..3474b4a3c5 100644
   --- 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
   +++ 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
   @@ -120,9 +120,7 @@ public class SerialGatewaySenderImpl extends 
AbstractRemoteGatewaySender {
        AbstractGatewaySenderEventProcessor eventProcessor;
        if (getDispatcherThreads() > 1) {
          eventProcessor = new 
RemoteConcurrentSerialGatewaySenderEventProcessor(
   -          SerialGatewaySenderImpl.this, getThreadMonitorObj(), cleanQueues,
   -          enforceThreadsConnectSameReceiver);
   -      // 
eventProcessor.setEnforceThreadsConnectSameReceiver(enforceThreadsConnectSameReceiver);
   +          SerialGatewaySenderImpl.this, getThreadMonitorObj(), cleanQueues);
        } else {
          eventProcessor = new 
RemoteSerialGatewaySenderEventProcessor(SerialGatewaySenderImpl.this,
              getId(), getThreadMonitorObj(), cleanQueues);
   ```

##########
File path: 
geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
##########
@@ -362,11 +366,71 @@ public void destroyConnection() {
     }
   }
 
+  Connection retryInitializeConnection(Connection con) {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    ServerLocation server = this.sender.getServerLocation();
+    String connectedServerId = con.getEndpoint().getMemberId().getUniqueId();
+    String expectedServerId = this.processor.getExpectedReceiverUniqueId();
+
+    if (expectedServerId.equals("")) {
+      if (isDebugEnabled) {
+        logger.debug("First dispatcher connected to server " + 
connectedServerId);
+      }
+      this.processor.setExpectedReceiverUniqueId(connectedServerId);
+      return con;
+    }
+
+    int attempt = 0;
+    final int attemptsPerServer = 5;

Review comment:
       Should attemptsPerServer be configurable?
   

##########
File path: 
geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
##########
@@ -362,11 +366,71 @@ public void destroyConnection() {
     }
   }
 
+  Connection retryInitializeConnection(Connection con) {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    ServerLocation server = this.sender.getServerLocation();

Review comment:
       The server variable is unused




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> New option for serial gw sender threads start when receivers share ip and port
> ------------------------------------------------------------------------------
>
>                 Key: GEODE-8202
>                 URL: https://issues.apache.org/jira/browse/GEODE-8202
>             Project: Geode
>          Issue Type: Improvement
>            Reporter: Alberto Bustamante Reyes
>            Assignee: Alberto Bustamante Reyes
>            Priority: Major
>              Labels: pull-request-available
>
> RFC: 
> [https://cwiki.apache.org/confluence/display/GEODE/New+option+for+serial+gw+sender+dispatcher+threads+start|https://cwiki.apache.org/confluence/display/GEODE/New+option+for+serial+gw+sender+dispatcher+threads+start]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to