Author: markt Date: Tue Oct 20 09:00:08 2015 New Revision: 1709543 URL: http://svn.apache.org/viewvc?rev=1709543&view=rev Log: Move the additional dispatches required from the SocketWrapper to the Processor
Modified: tomcat/trunk/java/org/apache/coyote/AbstractProcessorLight.java tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java tomcat/trunk/java/org/apache/coyote/Processor.java tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Modified: tomcat/trunk/java/org/apache/coyote/AbstractProcessorLight.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProcessorLight.java?rev=1709543&r1=1709542&r2=1709543&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/AbstractProcessorLight.java (original) +++ tomcat/trunk/java/org/apache/coyote/AbstractProcessorLight.java Tue Oct 20 09:00:08 2015 @@ -16,6 +16,12 @@ */ package org.apache.coyote; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import org.apache.tomcat.util.net.DispatchType; + /** * This is a light-weight abstract processor implementation that is intended as * a basis for all Processor implementations from the light-weight upgrade @@ -23,4 +29,40 @@ package org.apache.coyote; */ public abstract class AbstractProcessorLight implements Processor { + private Set<DispatchType> dispatches = new CopyOnWriteArraySet<>(); + + + @Override + public void addDispatch(DispatchType dispatchType) { + synchronized (dispatches) { + dispatches.add(dispatchType); + } + } + + + @Override + public Iterator<DispatchType> getIteratorAndClearDispatches() { + // Note: Logic in AbstractProtocol depends on this method only returning + // a non-null value if the iterator is non-empty. i.e. it should never + // return an empty iterator. + Iterator<DispatchType> result; + synchronized (dispatches) { + // Synchronized as the generation of the iterator and the clearing + // of dispatches needs to be an atomic operation. + result = dispatches.iterator(); + if (result.hasNext()) { + dispatches.clear(); + } else { + result = null; + } + } + return result; + } + + + protected void clearDispatches() { + synchronized (dispatches) { + dispatches.clear(); + } + } } Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java?rev=1709543&r1=1709542&r2=1709543&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java Tue Oct 20 09:00:08 2015 @@ -758,7 +758,7 @@ public abstract class AbstractProtocol<S if (dispatches == null || !dispatches.hasNext()) { // Only returns non-null iterator if there are // dispatches to process. - dispatches = wrapper.getIteratorAndClearDispatches(); + dispatches = processor.getIteratorAndClearDispatches(); } } while (state == SocketState.ASYNC_END || state == SocketState.UPGRADING || Modified: tomcat/trunk/java/org/apache/coyote/Processor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/Processor.java?rev=1709543&r1=1709542&r2=1709543&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/Processor.java (original) +++ tomcat/trunk/java/org/apache/coyote/Processor.java Tue Oct 20 09:00:08 2015 @@ -18,11 +18,13 @@ package org.apache.coyote; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Iterator; import java.util.concurrent.Executor; import javax.servlet.http.HttpUpgradeHandler; import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; +import org.apache.tomcat.util.net.DispatchType; import org.apache.tomcat.util.net.SSLSupport; import org.apache.tomcat.util.net.SocketStatus; import org.apache.tomcat.util.net.SocketWrapperBase; @@ -96,4 +98,8 @@ public interface Processor { * an existing multiplexed connection. */ void pause(); + + void addDispatch(DispatchType dispatchType); + + Iterator<DispatchType> getIteratorAndClearDispatches(); } Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java?rev=1709543&r1=1709542&r2=1709543&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java Tue Oct 20 09:00:08 2015 @@ -483,7 +483,7 @@ public class AjpProcessor extends Abstra break; } case ASYNC_COMPLETE: { - socketWrapper.clearDispatches(); + clearDispatches(); if (asyncStateMachine.asyncComplete()) { socketWrapper.processSocket(SocketStatus.OPEN_READ, true); } @@ -573,15 +573,15 @@ public class AjpProcessor extends Abstra break; } case DISPATCH_READ: { - socketWrapper.addDispatch(DispatchType.NON_BLOCKING_READ); + addDispatch(DispatchType.NON_BLOCKING_READ); break; } case DISPATCH_WRITE: { - socketWrapper.addDispatch(DispatchType.NON_BLOCKING_WRITE); + addDispatch(DispatchType.NON_BLOCKING_WRITE); break; } case DISPATCH_EXECUTE: { - socketWrapper.executeNonBlockingDispatches(); + socketWrapper.executeNonBlockingDispatches(getIteratorAndClearDispatches()); break; } case CLOSE_NOW: { Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java?rev=1709543&r1=1709542&r2=1709543&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java Tue Oct 20 09:00:08 2015 @@ -731,7 +731,7 @@ public class Http11Processor extends Abs break; } case ASYNC_COMPLETE: { - socketWrapper.clearDispatches(); + clearDispatches(); if (asyncStateMachine.asyncComplete()) { socketWrapper.processSocket(SocketStatus.OPEN_READ, true); } @@ -776,17 +776,17 @@ public class Http11Processor extends Abs break; } case DISPATCH_READ: { - socketWrapper.addDispatch(DispatchType.NON_BLOCKING_READ); + addDispatch(DispatchType.NON_BLOCKING_READ); break; } case DISPATCH_WRITE: { - socketWrapper.addDispatch(DispatchType.NON_BLOCKING_WRITE); + addDispatch(DispatchType.NON_BLOCKING_WRITE); break; } case DISPATCH_EXECUTE: { SocketWrapperBase<?> wrapper = socketWrapper; if (wrapper != null) { - wrapper.executeNonBlockingDispatches(); + wrapper.executeNonBlockingDispatches(getIteratorAndClearDispatches()); } break; } Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java?rev=1709543&r1=1709542&r2=1709543&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java Tue Oct 20 09:00:08 2015 @@ -43,8 +43,8 @@ public class UpgradeProcessorExternal ex public UpgradeProcessorExternal(SocketWrapperBase<?> wrapper, ByteBuffer leftOverInput, HttpUpgradeHandler httpUpgradeHandler) { super(wrapper, leftOverInput, httpUpgradeHandler); - this.upgradeServletInputStream = new UpgradeServletInputStream(wrapper); - this.upgradeServletOutputStream = new UpgradeServletOutputStream(wrapper); + this.upgradeServletInputStream = new UpgradeServletInputStream(this, wrapper); + this.upgradeServletOutputStream = new UpgradeServletOutputStream(this, wrapper); wrapper.unRead(leftOverInput); /* Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java?rev=1709543&r1=1709542&r2=1709543&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java Tue Oct 20 09:00:08 2015 @@ -22,6 +22,7 @@ import javax.servlet.ReadListener; import javax.servlet.ServletInputStream; import org.apache.coyote.ContainerThreadMarker; +import org.apache.coyote.Processor; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.ExceptionUtils; @@ -35,6 +36,7 @@ public class UpgradeServletInputStream e private static final StringManager sm = StringManager.getManager(UpgradeServletInputStream.class); + private final Processor processor; private final SocketWrapperBase<?> socketWrapper; private volatile boolean closed = false; @@ -45,7 +47,8 @@ public class UpgradeServletInputStream e private volatile ClassLoader applicationLoader = null; - public UpgradeServletInputStream(SocketWrapperBase<?> socketWrapper) { + public UpgradeServletInputStream(Processor processor, SocketWrapperBase<?> socketWrapper) { + this.processor = processor; this.socketWrapper = socketWrapper; } @@ -101,7 +104,7 @@ public class UpgradeServletInputStream e // Container is responsible for first call to onDataAvailable(). if (ContainerThreadMarker.isContainerThread()) { - socketWrapper.addDispatch(DispatchType.NON_BLOCKING_READ); + processor.addDispatch(DispatchType.NON_BLOCKING_READ); } else { socketWrapper.registerReadInterest(); } Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java?rev=1709543&r1=1709542&r2=1709543&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java Tue Oct 20 09:00:08 2015 @@ -22,6 +22,7 @@ import javax.servlet.ServletOutputStream import javax.servlet.WriteListener; import org.apache.coyote.ContainerThreadMarker; +import org.apache.coyote.Processor; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.ExceptionUtils; @@ -35,6 +36,7 @@ public class UpgradeServletOutputStream private static final StringManager sm = StringManager.getManager(UpgradeServletOutputStream.class); + private final Processor processor; private final SocketWrapperBase<?> socketWrapper; // Used to ensure that isReady() and onWritePossible() have a consistent @@ -61,7 +63,8 @@ public class UpgradeServletOutputStream private volatile ClassLoader applicationLoader = null; - public UpgradeServletOutputStream(SocketWrapperBase<?> socketWrapper) { + public UpgradeServletOutputStream(Processor processor, SocketWrapperBase<?> socketWrapper) { + this.processor = processor; this.socketWrapper = socketWrapper; } @@ -115,7 +118,7 @@ public class UpgradeServletOutputStream registered = true; // Container is responsible for first call to onDataAvailable(). if (ContainerThreadMarker.isContainerThread()) { - socketWrapper.addDispatch(DispatchType.NON_BLOCKING_WRITE); + processor.addDispatch(DispatchType.NON_BLOCKING_WRITE); } else { socketWrapper.registerWriteInterest(); } Modified: tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java?rev=1709543&r1=1709542&r2=1709543&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java Tue Oct 20 09:00:08 2015 @@ -19,8 +19,6 @@ package org.apache.coyote.http2; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; import javax.servlet.http.HttpUpgradeHandler; @@ -48,7 +46,6 @@ public class StreamProcessor extends Abs private static final StringManager sm = StringManager.getManager(StreamProcessor.class); private final Stream stream; - private Set<DispatchType> dispatches = new CopyOnWriteArraySet<>(); private volatile SSLSupport sslSupport; @@ -374,11 +371,11 @@ public class StreamProcessor extends Abs break; } case DISPATCH_READ: { - dispatches.add(DispatchType.NON_BLOCKING_READ); + addDispatch(DispatchType.NON_BLOCKING_READ); break; } case DISPATCH_WRITE: { - dispatches.add(DispatchType.NON_BLOCKING_WRITE); + addDispatch(DispatchType.NON_BLOCKING_WRITE); break; } case DISPATCH_EXECUTE: { @@ -482,35 +479,6 @@ public class StreamProcessor extends Abs } - public void addDispatch(DispatchType dispatchType) { - synchronized (dispatches) { - dispatches.add(dispatchType); - } - } - public Iterator<DispatchType> getIteratorAndClearDispatches() { - // Note: Logic in AbstractProtocol depends on this method only returning - // a non-null value if the iterator is non-empty. i.e. it should never - // return an empty iterator. - Iterator<DispatchType> result; - synchronized (dispatches) { - // Synchronized as the generation of the iterator and the clearing - // of dispatches needs to be an atomic operation. - result = dispatches.iterator(); - if (result.hasNext()) { - dispatches.clear(); - } else { - result = null; - } - } - return result; - } - public void clearDispatches() { - synchronized (dispatches) { - dispatches.clear(); - } - } - - @Override public HttpUpgradeHandler getHttpUpgradeHandler() { // Should never happen Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java?rev=1709543&r1=1709542&r2=1709543&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java Tue Oct 20 09:00:08 2015 @@ -811,7 +811,8 @@ public abstract class AbstractEndpoint<S SocketStatus socketStatus, boolean dispatch); - public void executeNonBlockingDispatches(SocketWrapperBase<S> socketWrapper) { + public void executeNonBlockingDispatches(SocketWrapperBase<S> socketWrapper, + Iterator<DispatchType> dispatches) { /* * This method is called when non-blocking IO is initiated by defining * a read and/or write listener in a non-container thread. It is called @@ -831,8 +832,6 @@ public abstract class AbstractEndpoint<S * sure that the socket has been added to the waitingRequests queue. */ synchronized (socketWrapper) { - Iterator<DispatchType> dispatches = socketWrapper.getIteratorAndClearDispatches(); - while (dispatches != null && dispatches.hasNext()) { DispatchType dispatchType = dispatches.next(); processSocket(socketWrapper, dispatchType.getSocketStatus(), false); Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1709543&r1=1709542&r2=1709543&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Tue Oct 20 09:00:08 2015 @@ -20,8 +20,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -98,8 +96,6 @@ public abstract class SocketWrapperBase< */ protected int bufferedWriteSize = 64*1024; //64k default write buffer - private Set<DispatchType> dispatches = new CopyOnWriteArraySet<>(); - public SocketWrapperBase(E socket, AbstractEndpoint<E> endpoint) { this.socket = socket; this.endpoint = endpoint; @@ -298,34 +294,6 @@ public abstract class SocketWrapperBase< return socketBufferHandler.isWriteBufferWritable() && bufferedWrites.size() == 0; } - public void addDispatch(DispatchType dispatchType) { - synchronized (dispatches) { - dispatches.add(dispatchType); - } - } - public Iterator<DispatchType> getIteratorAndClearDispatches() { - // Note: Logic in AbstractProtocol depends on this method only returning - // a non-null value if the iterator is non-empty. i.e. it should never - // return an empty iterator. - Iterator<DispatchType> result; - synchronized (dispatches) { - // Synchronized as the generation of the iterator and the clearing - // of dispatches needs to be an atomic operation. - result = dispatches.iterator(); - if (result.hasNext()) { - dispatches.clear(); - } else { - result = null; - } - } - return result; - } - public void clearDispatches() { - synchronized (dispatches) { - dispatches.clear(); - } - } - /** * Overridden for debug purposes. No guarantees are made about the format of @@ -587,8 +555,8 @@ public abstract class SocketWrapperBase< } - public void executeNonBlockingDispatches() { - endpoint.executeNonBlockingDispatches(this); + public void executeNonBlockingDispatches(Iterator<DispatchType> dispatches) { + endpoint.executeNonBlockingDispatches(this, dispatches); } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org