This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch 9.0.x in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/9.0.x by this push: new 1311908259 BZ 66513: Enforce one concurrent request per session requirement (#623) 1311908259 is described below commit 13119082591bd38ac46a7367b3e8372650615b37 Author: Mark Thomas <ma...@apache.org> AuthorDate: Thu Jun 1 11:16:16 2023 +0200 BZ 66513: Enforce one concurrent request per session requirement (#623) --- .../apache/catalina/session/PersistentManager.java | 2 + .../apache/catalina/valves/PersistentValve.java | 338 ++++++++++++++++----- .../catalina/valves/TestPersistentValve.java | 94 ++++++ webapps/docs/changelog.xml | 6 + webapps/docs/config/valve.xml | 21 ++ 5 files changed, 385 insertions(+), 76 deletions(-) diff --git a/java/org/apache/catalina/session/PersistentManager.java b/java/org/apache/catalina/session/PersistentManager.java index 5fd7cd8ea2..815c9e38fd 100644 --- a/java/org/apache/catalina/session/PersistentManager.java +++ b/java/org/apache/catalina/session/PersistentManager.java @@ -24,6 +24,8 @@ package org.apache.catalina.session; * <li>Fault tolerance, keep sessions backed up on disk to allow recovery in the event of unplanned restarts.</li> * <li>Limit the number of active sessions kept in memory by swapping less active sessions out to disk.</li> * </ul> + * If used with a load-balancer, the load-balancer must be configured to use sticky sessions for this manager to operate + * correctly. * * @author Kief Morris (k...@kief.com) */ diff --git a/java/org/apache/catalina/valves/PersistentValve.java b/java/org/apache/catalina/valves/PersistentValve.java index 8e74e46e0f..bf2df77941 100644 --- a/java/org/apache/catalina/valves/PersistentValve.java +++ b/java/org/apache/catalina/valves/PersistentValve.java @@ -17,6 +17,10 @@ package org.apache.catalina.valves; import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; @@ -36,13 +40,31 @@ import org.apache.catalina.connector.Request; import org.apache.catalina.connector.Response; /** - * Valve that implements per-request session persistence. It is intended to be used with non-sticky load-balancers. + * Valve that implements per-request session persistence. It is intended to be used with non-sticky load-balancers and a + * PersistentManager. The Valve works by loading the session from the Store at the start of the request, the request + * then updates the session as required and the Valve saves the session to the Store at the end of the request. * <p> - * <b>USAGE CONSTRAINT</b>: To work correctly it requires a PersistentManager. + * To avoid conflicts and/or errors when updating the session store, each session must only be accessed by no more than + * one concurrent request. The {@code filter} field can be used to define requests (e.g. those for static resources) + * that do not need access to the session and can Requests for resources that do not need to access the session and can + * bypass the session load/save functionality provided by this Valve. * <p> - * <b>USAGE CONSTRAINT</b>: To work correctly it assumes only one request exists per session at any one time. - * - * @author Jean-Frederic Clere + * The Valve uses a per session {@code Semaphore} to ensure that each session is accessed by no more than one request at + * a time within a single Tomcat instance. The behaviour if multiple requests try to access the session concurrently can + * be controlled by the {@code semaphoreFairness}, {@code semaphoreBlockOnAcquire} and {@code + * semaphoreAcquireUninterruptibly} fields. If a request fails to obtain the Semaphore, the response is generated by the + * {@link #onSemaphoreNotAcquired(Request, Response)} method which, by default, returns a {@code 429} status code. + * <p> + * The per session Semaphores only provide limited protection against concurrent requests within a single Tomcat + * instance. If multiple requests access the same session concurrently across different Tomcat instances, update + * conflicts and/or session data loss and/or errors are very likely. + * <p> + * <b>USAGE CONSTRAINTS</b>: + * <ul> + * <li>This Valve must only be used with a PersistentManager</li> + * <li>The client must ensure that no more than one concurrent request accesses a session at any time across all Tomcat + * instances</li> + * </ul> */ public class PersistentValve extends ValveBase { @@ -55,15 +77,20 @@ public class PersistentValve extends ValveBase { protected Pattern filter = null; - // ------------------------------------------------------ Constructor + private ConcurrentMap<String,UsageCountingSemaphore> sessionToSemaphoreMap = new ConcurrentHashMap<>(); + + private boolean semaphoreFairness = true; + + private boolean semaphoreBlockOnAcquire = true; + + private boolean semaphoreAcquireUninterruptibly = true; + public PersistentValve() { super(true); } - // --------------------------------------------------------- Public Methods - @Override public void setContainer(Container container) { super.setContainer(container); @@ -101,72 +128,99 @@ public class PersistentValve extends ValveBase { return; } - // Update the session last access time for our session (if any) String sessionId = request.getRequestedSessionId(); - Manager manager = context.getManager(); - if (sessionId != null && manager instanceof StoreManager) { - Store store = ((StoreManager) manager).getStore(); - if (store != null) { - Session session = null; - try { - session = store.load(sessionId); - } catch (Exception e) { - container.getLogger().error("deserializeError"); - } - if (session != null) { - if (!session.isValid() || isSessionStale(session, System.currentTimeMillis())) { - if (container.getLogger().isDebugEnabled()) { - container.getLogger().debug("session swapped in is invalid or expired"); - } - session.expire(); - store.remove(sessionId); + UsageCountingSemaphore semaphore = null; + boolean mustReleaseSemaphore = true; + + try { + // Acquire the per session semaphore + if (sessionId != null) { + semaphore = sessionToSemaphoreMap.compute(sessionId, + (k, v) -> v == null ? new UsageCountingSemaphore(semaphoreFairness) : v.incrementUsageCount()); + if (semaphoreBlockOnAcquire) { + if (semaphoreAcquireUninterruptibly) { + semaphore.acquireUninterruptibly(); } else { - session.setManager(manager); - // session.setId(sessionId); Only if new ??? - manager.add(session); - // ((StandardSession)session).activate(); - session.access(); - session.endAccess(); + try { + semaphore.acquire(); + } catch (InterruptedException e) { + mustReleaseSemaphore = false; + onSemaphoreNotAcquired(request, response); + return; + } + } + } else { + if (!semaphore.tryAcquire()) { + onSemaphoreNotAcquired(request, response); + return; } } } - } - if (container.getLogger().isDebugEnabled()) { - container.getLogger().debug("sessionId: " + sessionId); - } - - // Ask the next valve to process the request. - getNext().invoke(request, response); - // If still processing async, don't try to store the session - if (!request.isAsync()) { - // Read the sessionid after the response. - // HttpSession hsess = hreq.getSession(false); - Session hsess; - try { - hsess = request.getSessionInternal(false); - } catch (Exception ex) { - hsess = null; - } - String newsessionId = null; - if (hsess != null) { - newsessionId = hsess.getIdInternal(); + // Update the session last access time for our session (if any) + Manager manager = context.getManager(); + if (sessionId != null && manager instanceof StoreManager) { + Store store = ((StoreManager) manager).getStore(); + if (store != null) { + Session session = null; + try { + session = store.load(sessionId); + } catch (Exception e) { + container.getLogger().error("deserializeError"); + } + if (session != null) { + if (!session.isValid() || isSessionStale(session, System.currentTimeMillis())) { + if (container.getLogger().isDebugEnabled()) { + container.getLogger().debug("session swapped in is invalid or expired"); + } + session.expire(); + store.remove(sessionId); + } else { + session.setManager(manager); + // session.setId(sessionId); Only if new ??? + manager.add(session); + // ((StandardSession)session).activate(); + session.access(); + session.endAccess(); + } + } + } } - if (container.getLogger().isDebugEnabled()) { - container.getLogger().debug("newsessionId: " + newsessionId); + container.getLogger().debug("sessionId: " + sessionId); } - if (newsessionId != null) { + + // Ask the next valve to process the request. + getNext().invoke(request, response); + + // If still processing async, don't try to store the session + if (!request.isAsync()) { + // Read the sessionid after the response. + // HttpSession hsess = hreq.getSession(false); + Session hsess; try { - bind(context); - - /* store the session and remove it from the manager */ - if (manager instanceof StoreManager) { - Session session = manager.findSession(newsessionId); - Store store = ((StoreManager) manager).getStore(); - boolean stored = false; - if (session != null) { - synchronized (session) { + hsess = request.getSessionInternal(false); + } catch (Exception ex) { + hsess = null; + } + String newsessionId = null; + if (hsess != null) { + newsessionId = hsess.getIdInternal(); + } + + if (container.getLogger().isDebugEnabled()) { + container.getLogger().debug("newsessionId: " + newsessionId); + } + if (newsessionId != null) { + try { + bind(context); + + /* store the session and remove it from the manager */ + if (manager instanceof StoreManager) { + Session session = manager.findSession(newsessionId); + Store store = ((StoreManager) manager).getStore(); + boolean stored = false; + if (session != null) { if (store != null && session.isValid() && !isSessionStale(session, System.currentTimeMillis())) { store.save(session); @@ -175,28 +229,51 @@ public class PersistentValve extends ValveBase { stored = true; } } - } - if (!stored) { + if (!stored) { + if (container.getLogger().isDebugEnabled()) { + container.getLogger() + .debug("newsessionId store: " + store + " session: " + session + + " valid: " + + (session == null ? "N/A" : Boolean.toString(session.isValid())) + + " stale: " + isSessionStale(session, System.currentTimeMillis())); + } + } + } else { if (container.getLogger().isDebugEnabled()) { - container.getLogger() - .debug("newsessionId store: " + store + " session: " + session + " valid: " + - (session == null ? "N/A" : Boolean.toString(session.isValid())) + - " stale: " + isSessionStale(session, System.currentTimeMillis())); + container.getLogger().debug("newsessionId Manager: " + manager); } } - } else { - if (container.getLogger().isDebugEnabled()) { - container.getLogger().debug("newsessionId Manager: " + manager); - } + } finally { + unbind(context); } - } finally { - unbind(context); } } + } finally { + if (semaphore != null) { + if (mustReleaseSemaphore) { + semaphore.release(); + } + sessionToSemaphoreMap.computeIfPresent(sessionId, + (k, v) -> v.decrementAndGetUsageCount() == 0 ? null : v); + } } } + /** + * Handle the case where a semaphore cannot be obtained. The default behaviour is to return a 429 (too many + * requests) status code. + * + * @param request The request that will not be processed + * @param response The response that will be used for this request + * + * @throws IOException If an I/O error occurs while working with the request or response + */ + protected void onSemaphoreNotAcquired(Request request, Response response) throws IOException { + response.sendError(429); + } + + /** * Indicate whether the session has been idle for longer than its expiration date as of the supplied time. FIXME: * Probably belongs in the Session class. @@ -258,4 +335,113 @@ public class PersistentValve extends ValveBase { } } } + + + /** + * If multiple threads attempt to acquire the same per session Semaphore, will permits be granted in the same order + * they were requested? + * + * @return {@code true} if fairness is enabled, otherwise {@code false} + */ + public boolean isSemaphoreFairness() { + return semaphoreFairness; + } + + + /** + * Configure whether the per session Semaphores will handle granting of permits in the same order they were + * requested if multiple threads attempt to acquire the same Semaphore. + * + * @param semaphoreFairness {@code true} if permits should be granted in the same order they are requested, + * otherwise {@code false} + */ + public void setSemaphoreFairness(boolean semaphoreFairness) { + this.semaphoreFairness = semaphoreFairness; + } + + + /** + * If a thread attempts to acquire the per session Semaphore while it is being used by another request, should the + * thread block to wait for the Semaphore or should the request be rejected? + * + * @return {@code true} if the thread should block, otherwise {@code false} to reject the concurrent request + */ + public boolean isSemaphoreBlockOnAcquire() { + return semaphoreBlockOnAcquire; + } + + + /** + * Configure whether a thread should block and wait for the per session Semaphore or reject the request if the + * Semaphore is being used by another request. + * + * @param semaphoreBlockOnAcquire {@code true} to block, otherwise {@code false} + */ + public void setSemaphoreBlockOnAcquire(boolean semaphoreBlockOnAcquire) { + this.semaphoreBlockOnAcquire = semaphoreBlockOnAcquire; + } + + + /** + * If a thread is blocking to acquire a per session Semaphore, can that thread be interrupted? + * + * @return {@code true} if the thread can <b>not</b> be interrupted, otherwise {@code false}. + */ + public boolean isSemaphoreAcquireUninterruptibly() { + return semaphoreAcquireUninterruptibly; + } + + + /** + * Configure whether a thread blocking to acquire a per session Semaphore can be interrupted. + * + * @param semaphoreAcquireUninterruptibly {@code true} if the thread can <b>not</b> be interrupted, otherwise + * {@code false}. + */ + public void setSemaphoreAcquireUninterruptibly(boolean semaphoreAcquireUninterruptibly) { + this.semaphoreAcquireUninterruptibly = semaphoreAcquireUninterruptibly; + } + + + /* + * The PersistentValve uses a per session semaphore to ensure that only one request accesses a session at a time. To + * limit the size of the session ID to Semaphore map, the Semaphores are created when required and destroyed (made + * eligible for GC) as soon as they are not required. Tracking usage in a thread-safe way requires a usage counter + * that does not block. The Semaphore's internal tracking can't be used because the only way to increment usage is + * via the acquire methods and they block. Therefore, this class was created which uses a separate AtomicLong long + * to track usage. + */ + private static class UsageCountingSemaphore { + private final AtomicLong usageCount = new AtomicLong(1); + private final Semaphore semaphore; + + private UsageCountingSemaphore(boolean fairness) { + semaphore = new Semaphore(1, fairness); + } + + private UsageCountingSemaphore incrementUsageCount() { + usageCount.incrementAndGet(); + return this; + } + + private long decrementAndGetUsageCount() { + return usageCount.decrementAndGet(); + } + + private void acquire() throws InterruptedException { + semaphore.acquire(); + } + + private void acquireUninterruptibly() { + semaphore.acquireUninterruptibly(); + } + + private boolean tryAcquire() { + return semaphore.tryAcquire(); + } + + private void release() { + semaphore.release(); + } + } } diff --git a/test/org/apache/catalina/valves/TestPersistentValve.java b/test/org/apache/catalina/valves/TestPersistentValve.java new file mode 100644 index 0000000000..b53f85c05e --- /dev/null +++ b/test/org/apache/catalina/valves/TestPersistentValve.java @@ -0,0 +1,94 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.valves; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.servlet.ServletException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.catalina.connector.Request; +import org.apache.catalina.connector.Response; +import org.apache.tomcat.unittest.TesterRequest; +import org.apache.tomcat.unittest.TesterResponse; + +public class TestPersistentValve { + + @Test + public void testSemaphore() throws Exception { + // Create the test objects + PersistentValve pv = new PersistentValve(); + Request request = new TesterRequest(); + Response response = new TesterResponse(); + TesterValve testerValve = new TesterValve(); + + // Configure the test objects + request.setRequestedSessionId("1234"); + + // Plumb the test objects together + pv.setContainer(request.getContext()); + pv.setNext(testerValve); + + // Run the test + Thread[] threads = new Thread[5]; + + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + try { + pv.invoke(request, response); + } catch (IOException | ServletException e) { + throw new RuntimeException(e); + } + }); + } + + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + + Assert.assertEquals(1, testerValve.getMaximumConcurrency()); + } + + + private static class TesterValve extends ValveBase { + + private static AtomicInteger maximumConcurrency = new AtomicInteger(); + private static AtomicInteger concurrency = new AtomicInteger(); + + @Override + public void invoke(Request request, Response response) throws IOException, ServletException { + int c = concurrency.incrementAndGet(); + maximumConcurrency.getAndUpdate((v) -> c > v ? c : v); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore + } + concurrency.decrementAndGet(); + } + + public int getMaximumConcurrency() { + return maximumConcurrency.get(); + } + } +} diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index b0ae0efbc6..b9a4603707 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -123,6 +123,12 @@ virtual threads. This Executor requires a minimum Java version of Java 21. (markt) </add> + <fix> + <bug>66513</bug>: Partial fix that adds a per session Semaphore to the + <code>PersistentValve</code> that ensures that, within a single Tomcat + instance, there is no more than one concurrent request per session. + (markt) + </fix> <fix> <bug>66609</bug>: Ensure that the default servlet correctly escapes file names in directory listings when using XML output. Based on pull diff --git a/webapps/docs/config/valve.xml b/webapps/docs/config/valve.xml index 056533b546..c796dd0131 100644 --- a/webapps/docs/config/valve.xml +++ b/webapps/docs/config/valve.xml @@ -2637,6 +2637,27 @@ <code>java.util.regex</code>.</p> </attribute> + <attribute name="semaphoreAcquireUninterruptibly" required="false"> + <p>Flag to determine if a thread that blocks waiting for the per session + Semaphore should do so uninterruptibly. Has no effect if + <strong>semaphoreBlockOnAcquire</strong> is <code>false</code>. If not + specified, the default value of <code>true</code> will be used.</p> + </attribute> + + <attribute name="semaphoreBlockOnAcquire" required="false"> + <p>Flag to determine if a thread that wishes to acquire the per session + Semaphore when it is held by another thread should block until it can + acquire the Semaphore or if the waiting request be rejected. If not + specified, the default value of <code>true</code> will be used.</p> + </attribute> + + <attribute name="semaphoreFairness" required="false"> + <p>Flag to determine if the per session Semaphore will grant requests + for the Semaphore in the same order they were received. Has no effect if + <strong>semaphoreBlockOnAcquire</strong> is <code>false</code>. If not + specified, the default value of <code>true</code> will be used.</p> + </attribute> + </attributes> </subsection> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org