This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch 7.0.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git

commit c60904f9cfd700456f11656086068b1e09ae3556
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Thu May 16 13:36:39 2019 +0100

    Fix https://bz.apache.org/bugzilla/show_bug.cgi?id=62841 poss deadlock
    
    Refactor the DeltaRequest serialization to reduce the window during
    which the DeltaSession is locked and to remove a potential cause of
    deadlocks during serialization.
---
 .../apache/catalina/ha/session/DeltaManager.java   |  58 +++++++-----
 .../apache/catalina/ha/session/DeltaSession.java   |  20 ++++
 .../tomcat/util/collections/SynchronizedStack.java | 105 +++++++++++++++++++++
 webapps/docs/changelog.xml                         |   6 ++
 4 files changed, 165 insertions(+), 24 deletions(-)

diff --git a/java/org/apache/catalina/ha/session/DeltaManager.java 
b/java/org/apache/catalina/ha/session/DeltaManager.java
index fc4e065..0e2a7e7 100644
--- a/java/org/apache/catalina/ha/session/DeltaManager.java
+++ b/java/org/apache/catalina/ha/session/DeltaManager.java
@@ -38,6 +38,7 @@ import org.apache.catalina.session.ManagerBase;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.io.ReplicationStream;
 import org.apache.tomcat.util.ExceptionUtils;
+import org.apache.tomcat.util.collections.SynchronizedStack;
 import org.apache.tomcat.util.res.StringManager;
 
 /**
@@ -97,7 +98,8 @@ public class DeltaManager extends ClusterManagerBase{
         new ArrayList<SessionMessage>() ;
     private boolean receiverQueue = false ;
     private boolean stateTimestampDrop = true ;
-    private long stateTransferCreateSendTime;
+    private volatile long stateTransferCreateSendTime;
+    private SynchronizedStack<DeltaRequest> deltaRequestPool = new 
SynchronizedStack<DeltaRequest>();
 
     // ------------------------------------------------------------------ 
stats attributes
 
@@ -938,26 +940,27 @@ public class DeltaManager extends ClusterManagerBase{
     @Override
     public ClusterMessage requestCompleted(String sessionId) {
          return requestCompleted(sessionId, false);
-     }
-
-     /**
-      * When the request has been completed, the replication valve will notify
-      * the manager, and the manager will decide whether any replication is
-      * needed or not. If there is a need for replication, the manager will
-      * create a session message and that will be replicated. The cluster
-      * determines where it gets sent.
-      *
-      * Session expiration also calls this method, but with expires == true.
-      *
-      * @param sessionId -
-      *            the sessionId that just completed.
-      * @param expires -
-      *            whether this method has been called during session 
expiration
-      * @return a SessionMessage to be sent,
-      */
-     public ClusterMessage requestCompleted(String sessionId, boolean expires) 
{
+    }
+
+    /**
+     * When the request has been completed, the replication valve will notify
+     * the manager, and the manager will decide whether any replication is
+     * needed or not. If there is a need for replication, the manager will
+     * create a session message and that will be replicated. The cluster
+     * determines where it gets sent.
+     *
+     * Session expiration also calls this method, but with expires == true.
+     *
+     * @param sessionId -
+     *            the sessionId that just completed.
+     * @param expires -
+     *            whether this method has been called during session expiration
+     * @return a SessionMessage to be sent,
+     */
+    public ClusterMessage requestCompleted(String sessionId, boolean expires) {
         DeltaSession session = null;
         SessionMessage msg = null;
+        DeltaRequest deltaRequest = null;
         try {
             session = (DeltaSession) findSession(sessionId);
             if (session == null) {
@@ -965,8 +968,12 @@ public class DeltaManager extends ClusterManagerBase{
                 // removed the session from the Manager.
                 return null;
             }
-            DeltaRequest deltaRequest = session.getDeltaRequest();
-            session.lock();
+            DeltaRequest newDeltaRequest = deltaRequestPool.pop();
+            if (newDeltaRequest == null) {
+                // Will be configured in replaceDeltaRequest()
+                newDeltaRequest = new DeltaRequest();
+            }
+            deltaRequest = session.replaceDeltaRequest(newDeltaRequest);
             if (deltaRequest.getSize() > 0) {
                 counterSend_EVT_SESSION_DELTA++;
                 byte[] data = serializeDeltaRequest(session,deltaRequest);
@@ -975,13 +982,16 @@ public class DeltaManager extends ClusterManagerBase{
                                              data,
                                              sessionId,
                                              sessionId + "-" + 
System.currentTimeMillis());
-                session.resetDeltaRequest();
             }
         } catch (IOException x) {
             
log.error(sm.getString("deltaManager.createMessage.unableCreateDeltaRequest",sessionId),
 x);
             return null;
-        }finally {
-            if (session!=null) session.unlock();
+        } finally {
+            if (deltaRequest != null) {
+                // Reset the instance before it is returned to the pool
+                deltaRequest.reset();
+                deltaRequestPool.push(deltaRequest);
+            }
         }
         if(msg == null) {
             if(!expires && !session.isPrimarySession()) {
diff --git a/java/org/apache/catalina/ha/session/DeltaSession.java 
b/java/org/apache/catalina/ha/session/DeltaSession.java
index abafbf1..972a33e 100644
--- a/java/org/apache/catalina/ha/session/DeltaSession.java
+++ b/java/org/apache/catalina/ha/session/DeltaSession.java
@@ -618,6 +618,26 @@ public class DeltaSession extends StandardSession 
implements Externalizable,Clus
         return deltaRequest;
     }
 
+    /**
+     * Replace the existing deltaRequest with the provided replacement.
+     *
+     * @param deltaRequest The new deltaRequest. Expected to be either a newly
+     *                     created object or an instance that has been reset.
+     *
+     * @return The old deltaRequest
+     */
+    DeltaRequest replaceDeltaRequest(DeltaRequest deltaRequest) {
+        lock();
+        try {
+            DeltaRequest oldDeltaRequest = this.deltaRequest;
+            this.deltaRequest = deltaRequest;
+            this.deltaRequest.setSessionId(getIdInternal());
+            return oldDeltaRequest;
+        } finally {
+            unlock();
+        }
+    }
+
 
     // ------------------------------------------------- HttpSession Properties
 
diff --git a/java/org/apache/tomcat/util/collections/SynchronizedStack.java 
b/java/org/apache/tomcat/util/collections/SynchronizedStack.java
new file mode 100644
index 0000000..1af00ce
--- /dev/null
+++ b/java/org/apache/tomcat/util/collections/SynchronizedStack.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.collections;
+
+/**
+ * This is intended as a (mostly) GC-free alternative to
+ * {@link java.util.concurrent.ConcurrentLinkedQueue} when the requirement is 
to
+ * create a pool of re-usable objects with no requirement to shrink the pool.
+ * The aim is to provide the bare minimum of required functionality as quickly
+ * as possible with minimum garbage.
+ *
+ * @param <T> The type of object managed by this stack
+ */
+public class SynchronizedStack<T> {
+
+    public static final int DEFAULT_SIZE = 128;
+    private static final int DEFAULT_LIMIT = -1;
+
+    private int size;
+    private final int limit;
+
+    /*
+     * Points to the next available object in the stack
+     */
+    private int index = -1;
+
+    private Object[] stack;
+
+
+    public SynchronizedStack() {
+        this(DEFAULT_SIZE, DEFAULT_LIMIT);
+    }
+
+    public SynchronizedStack(int size, int limit) {
+        if (limit > -1 && size > limit) {
+            this.size = limit;
+        } else {
+            this.size = size;
+        }
+        this.limit = limit;
+        stack = new Object[size];
+    }
+
+
+    public synchronized boolean push(T obj) {
+        index++;
+        if (index == size) {
+            if (limit == -1 || size < limit) {
+                expand();
+            } else {
+                index--;
+                return false;
+            }
+        }
+        stack[index] = obj;
+        return true;
+    }
+
+    @SuppressWarnings("unchecked")
+    public synchronized T pop() {
+        if (index == -1) {
+            return null;
+        }
+        T result = (T) stack[index];
+        stack[index--] = null;
+        return result;
+    }
+
+    public synchronized void clear() {
+        if (index > -1) {
+            for (int i = 0; i < index + 1; i++) {
+                stack[i] = null;
+            }
+        }
+        index = -1;
+    }
+
+    private void expand() {
+        int newSize = size * 2;
+        if (limit != -1 && newSize > limit) {
+            newSize = limit;
+        }
+        Object[] newStack = new Object[newSize];
+        System.arraycopy(stack, 0, newStack, 0, size);
+        // This is the only point where garbage is created by throwing away the
+        // old array. Note it is only the array, not the contents, that becomes
+        // garbage.
+        stack = newStack;
+        size = newSize;
+    }
+}
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 0afc589..a5c3948 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -113,6 +113,12 @@
   <subsection name="Cluster">
     <changelog>
       <fix>
+        <bug>62841</bug>: Refactor the <code>DeltaRequest</code> serialization
+        to reduce the window during which the <code>DeltaSession</code> is
+        locked and to remove a potential cause of deadlocks during
+        serialization. (markt)
+      </fix>
+      <fix>
         <bug>63441</bug>: Further streamline the processing of session creation
         messages in the <code>DeltaManager</code> to reduce the possibility of 
a
         session update message being processed before the session has been


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to