This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch 7.0.x in repository https://gitbox.apache.org/repos/asf/tomcat.git
commit c60904f9cfd700456f11656086068b1e09ae3556 Author: Mark Thomas <ma...@apache.org> AuthorDate: Thu May 16 13:36:39 2019 +0100 Fix https://bz.apache.org/bugzilla/show_bug.cgi?id=62841 poss deadlock Refactor the DeltaRequest serialization to reduce the window during which the DeltaSession is locked and to remove a potential cause of deadlocks during serialization. --- .../apache/catalina/ha/session/DeltaManager.java | 58 +++++++----- .../apache/catalina/ha/session/DeltaSession.java | 20 ++++ .../tomcat/util/collections/SynchronizedStack.java | 105 +++++++++++++++++++++ webapps/docs/changelog.xml | 6 ++ 4 files changed, 165 insertions(+), 24 deletions(-) diff --git a/java/org/apache/catalina/ha/session/DeltaManager.java b/java/org/apache/catalina/ha/session/DeltaManager.java index fc4e065..0e2a7e7 100644 --- a/java/org/apache/catalina/ha/session/DeltaManager.java +++ b/java/org/apache/catalina/ha/session/DeltaManager.java @@ -38,6 +38,7 @@ import org.apache.catalina.session.ManagerBase; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.io.ReplicationStream; import org.apache.tomcat.util.ExceptionUtils; +import org.apache.tomcat.util.collections.SynchronizedStack; import org.apache.tomcat.util.res.StringManager; /** @@ -97,7 +98,8 @@ public class DeltaManager extends ClusterManagerBase{ new ArrayList<SessionMessage>() ; private boolean receiverQueue = false ; private boolean stateTimestampDrop = true ; - private long stateTransferCreateSendTime; + private volatile long stateTransferCreateSendTime; + private SynchronizedStack<DeltaRequest> deltaRequestPool = new SynchronizedStack<DeltaRequest>(); // ------------------------------------------------------------------ stats attributes @@ -938,26 +940,27 @@ public class DeltaManager extends ClusterManagerBase{ @Override public ClusterMessage requestCompleted(String sessionId) { return requestCompleted(sessionId, false); - } - - /** - * When the request has been completed, the replication valve will notify - * the manager, and the manager will decide whether any replication is - * needed or not. If there is a need for replication, the manager will - * create a session message and that will be replicated. The cluster - * determines where it gets sent. - * - * Session expiration also calls this method, but with expires == true. - * - * @param sessionId - - * the sessionId that just completed. - * @param expires - - * whether this method has been called during session expiration - * @return a SessionMessage to be sent, - */ - public ClusterMessage requestCompleted(String sessionId, boolean expires) { + } + + /** + * When the request has been completed, the replication valve will notify + * the manager, and the manager will decide whether any replication is + * needed or not. If there is a need for replication, the manager will + * create a session message and that will be replicated. The cluster + * determines where it gets sent. + * + * Session expiration also calls this method, but with expires == true. + * + * @param sessionId - + * the sessionId that just completed. + * @param expires - + * whether this method has been called during session expiration + * @return a SessionMessage to be sent, + */ + public ClusterMessage requestCompleted(String sessionId, boolean expires) { DeltaSession session = null; SessionMessage msg = null; + DeltaRequest deltaRequest = null; try { session = (DeltaSession) findSession(sessionId); if (session == null) { @@ -965,8 +968,12 @@ public class DeltaManager extends ClusterManagerBase{ // removed the session from the Manager. return null; } - DeltaRequest deltaRequest = session.getDeltaRequest(); - session.lock(); + DeltaRequest newDeltaRequest = deltaRequestPool.pop(); + if (newDeltaRequest == null) { + // Will be configured in replaceDeltaRequest() + newDeltaRequest = new DeltaRequest(); + } + deltaRequest = session.replaceDeltaRequest(newDeltaRequest); if (deltaRequest.getSize() > 0) { counterSend_EVT_SESSION_DELTA++; byte[] data = serializeDeltaRequest(session,deltaRequest); @@ -975,13 +982,16 @@ public class DeltaManager extends ClusterManagerBase{ data, sessionId, sessionId + "-" + System.currentTimeMillis()); - session.resetDeltaRequest(); } } catch (IOException x) { log.error(sm.getString("deltaManager.createMessage.unableCreateDeltaRequest",sessionId), x); return null; - }finally { - if (session!=null) session.unlock(); + } finally { + if (deltaRequest != null) { + // Reset the instance before it is returned to the pool + deltaRequest.reset(); + deltaRequestPool.push(deltaRequest); + } } if(msg == null) { if(!expires && !session.isPrimarySession()) { diff --git a/java/org/apache/catalina/ha/session/DeltaSession.java b/java/org/apache/catalina/ha/session/DeltaSession.java index abafbf1..972a33e 100644 --- a/java/org/apache/catalina/ha/session/DeltaSession.java +++ b/java/org/apache/catalina/ha/session/DeltaSession.java @@ -618,6 +618,26 @@ public class DeltaSession extends StandardSession implements Externalizable,Clus return deltaRequest; } + /** + * Replace the existing deltaRequest with the provided replacement. + * + * @param deltaRequest The new deltaRequest. Expected to be either a newly + * created object or an instance that has been reset. + * + * @return The old deltaRequest + */ + DeltaRequest replaceDeltaRequest(DeltaRequest deltaRequest) { + lock(); + try { + DeltaRequest oldDeltaRequest = this.deltaRequest; + this.deltaRequest = deltaRequest; + this.deltaRequest.setSessionId(getIdInternal()); + return oldDeltaRequest; + } finally { + unlock(); + } + } + // ------------------------------------------------- HttpSession Properties diff --git a/java/org/apache/tomcat/util/collections/SynchronizedStack.java b/java/org/apache/tomcat/util/collections/SynchronizedStack.java new file mode 100644 index 0000000..1af00ce --- /dev/null +++ b/java/org/apache/tomcat/util/collections/SynchronizedStack.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.collections; + +/** + * This is intended as a (mostly) GC-free alternative to + * {@link java.util.concurrent.ConcurrentLinkedQueue} when the requirement is to + * create a pool of re-usable objects with no requirement to shrink the pool. + * The aim is to provide the bare minimum of required functionality as quickly + * as possible with minimum garbage. + * + * @param <T> The type of object managed by this stack + */ +public class SynchronizedStack<T> { + + public static final int DEFAULT_SIZE = 128; + private static final int DEFAULT_LIMIT = -1; + + private int size; + private final int limit; + + /* + * Points to the next available object in the stack + */ + private int index = -1; + + private Object[] stack; + + + public SynchronizedStack() { + this(DEFAULT_SIZE, DEFAULT_LIMIT); + } + + public SynchronizedStack(int size, int limit) { + if (limit > -1 && size > limit) { + this.size = limit; + } else { + this.size = size; + } + this.limit = limit; + stack = new Object[size]; + } + + + public synchronized boolean push(T obj) { + index++; + if (index == size) { + if (limit == -1 || size < limit) { + expand(); + } else { + index--; + return false; + } + } + stack[index] = obj; + return true; + } + + @SuppressWarnings("unchecked") + public synchronized T pop() { + if (index == -1) { + return null; + } + T result = (T) stack[index]; + stack[index--] = null; + return result; + } + + public synchronized void clear() { + if (index > -1) { + for (int i = 0; i < index + 1; i++) { + stack[i] = null; + } + } + index = -1; + } + + private void expand() { + int newSize = size * 2; + if (limit != -1 && newSize > limit) { + newSize = limit; + } + Object[] newStack = new Object[newSize]; + System.arraycopy(stack, 0, newStack, 0, size); + // This is the only point where garbage is created by throwing away the + // old array. Note it is only the array, not the contents, that becomes + // garbage. + stack = newStack; + size = newSize; + } +} diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index 0afc589..a5c3948 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -113,6 +113,12 @@ <subsection name="Cluster"> <changelog> <fix> + <bug>62841</bug>: Refactor the <code>DeltaRequest</code> serialization + to reduce the window during which the <code>DeltaSession</code> is + locked and to remove a potential cause of deadlocks during + serialization. (markt) + </fix> + <fix> <bug>63441</bug>: Further streamline the processing of session creation messages in the <code>DeltaManager</code> to reduce the possibility of a session update message being processed before the session has been --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org