Author: markt
Date: Tue Oct 20 09:00:08 2015
New Revision: 1709543
URL: http://svn.apache.org/viewvc?rev=1709543&view=rev
Log:
Move the additional dispatches required from the SocketWrapper to the
Processor
Modified:
tomcat/trunk/java/org/apache/coyote/AbstractProcessorLight.java
tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
tomcat/trunk/java/org/apache/coyote/Processor.java
tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java
tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
Modified: tomcat/trunk/java/org/apache/coyote/AbstractProcessorLight.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProcessorLight.java?rev=1709543&r1=1709542&r2=1709543&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProcessorLight.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProcessorLight.java Tue Oct 20
09:00:08 2015
@@ -16,6 +16,12 @@
*/
package org.apache.coyote;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.tomcat.util.net.DispatchType;
+
/**
* This is a light-weight abstract processor implementation that is intended as
* a basis for all Processor implementations from the light-weight upgrade
@@ -23,4 +29,40 @@ package org.apache.coyote;
*/
public abstract class AbstractProcessorLight implements Processor {
+ private Set<DispatchType> dispatches = new CopyOnWriteArraySet<>();
+
+
+ @Override
+ public void addDispatch(DispatchType dispatchType) {
+ synchronized (dispatches) {
+ dispatches.add(dispatchType);
+ }
+ }
+
+
+ @Override
+ public Iterator<DispatchType> getIteratorAndClearDispatches() {
+ // Note: Logic in AbstractProtocol depends on this method only
returning
+ // a non-null value if the iterator is non-empty. i.e. it should never
+ // return an empty iterator.
+ Iterator<DispatchType> result;
+ synchronized (dispatches) {
+ // Synchronized as the generation of the iterator and the clearing
+ // of dispatches needs to be an atomic operation.
+ result = dispatches.iterator();
+ if (result.hasNext()) {
+ dispatches.clear();
+ } else {
+ result = null;
+ }
+ }
+ return result;
+ }
+
+
+ protected void clearDispatches() {
+ synchronized (dispatches) {
+ dispatches.clear();
+ }
+ }
}
Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java?rev=1709543&r1=1709542&r2=1709543&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java Tue Oct 20
09:00:08 2015
@@ -758,7 +758,7 @@ public abstract class AbstractProtocol<S
if (dispatches == null || !dispatches.hasNext()) {
// Only returns non-null iterator if there are
// dispatches to process.
- dispatches = wrapper.getIteratorAndClearDispatches();
+ dispatches = processor.getIteratorAndClearDispatches();
}
} while (state == SocketState.ASYNC_END ||
state == SocketState.UPGRADING ||
Modified: tomcat/trunk/java/org/apache/coyote/Processor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/Processor.java?rev=1709543&r1=1709542&r2=1709543&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/Processor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/Processor.java Tue Oct 20 09:00:08 2015
@@ -18,11 +18,13 @@ package org.apache.coyote;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Iterator;
import java.util.concurrent.Executor;
import javax.servlet.http.HttpUpgradeHandler;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.DispatchType;
import org.apache.tomcat.util.net.SSLSupport;
import org.apache.tomcat.util.net.SocketStatus;
import org.apache.tomcat.util.net.SocketWrapperBase;
@@ -96,4 +98,8 @@ public interface Processor {
* an existing multiplexed connection.
*/
void pause();
+
+ void addDispatch(DispatchType dispatchType);
+
+ Iterator<DispatchType> getIteratorAndClearDispatches();
}
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java?rev=1709543&r1=1709542&r2=1709543&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java Tue Oct 20
09:00:08 2015
@@ -483,7 +483,7 @@ public class AjpProcessor extends Abstra
break;
}
case ASYNC_COMPLETE: {
- socketWrapper.clearDispatches();
+ clearDispatches();
if (asyncStateMachine.asyncComplete()) {
socketWrapper.processSocket(SocketStatus.OPEN_READ, true);
}
@@ -573,15 +573,15 @@ public class AjpProcessor extends Abstra
break;
}
case DISPATCH_READ: {
- socketWrapper.addDispatch(DispatchType.NON_BLOCKING_READ);
+ addDispatch(DispatchType.NON_BLOCKING_READ);
break;
}
case DISPATCH_WRITE: {
- socketWrapper.addDispatch(DispatchType.NON_BLOCKING_WRITE);
+ addDispatch(DispatchType.NON_BLOCKING_WRITE);
break;
}
case DISPATCH_EXECUTE: {
- socketWrapper.executeNonBlockingDispatches();
+
socketWrapper.executeNonBlockingDispatches(getIteratorAndClearDispatches());
break;
}
case CLOSE_NOW: {
Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java?rev=1709543&r1=1709542&r2=1709543&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java Tue Oct 20
09:00:08 2015
@@ -731,7 +731,7 @@ public class Http11Processor extends Abs
break;
}
case ASYNC_COMPLETE: {
- socketWrapper.clearDispatches();
+ clearDispatches();
if (asyncStateMachine.asyncComplete()) {
socketWrapper.processSocket(SocketStatus.OPEN_READ, true);
}
@@ -776,17 +776,17 @@ public class Http11Processor extends Abs
break;
}
case DISPATCH_READ: {
- socketWrapper.addDispatch(DispatchType.NON_BLOCKING_READ);
+ addDispatch(DispatchType.NON_BLOCKING_READ);
break;
}
case DISPATCH_WRITE: {
- socketWrapper.addDispatch(DispatchType.NON_BLOCKING_WRITE);
+ addDispatch(DispatchType.NON_BLOCKING_WRITE);
break;
}
case DISPATCH_EXECUTE: {
SocketWrapperBase<?> wrapper = socketWrapper;
if (wrapper != null) {
- wrapper.executeNonBlockingDispatches();
+
wrapper.executeNonBlockingDispatches(getIteratorAndClearDispatches());
}
break;
}
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java?rev=1709543&r1=1709542&r2=1709543&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java
(original)
+++
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java
Tue Oct 20 09:00:08 2015
@@ -43,8 +43,8 @@ public class UpgradeProcessorExternal ex
public UpgradeProcessorExternal(SocketWrapperBase<?> wrapper, ByteBuffer
leftOverInput,
HttpUpgradeHandler httpUpgradeHandler) {
super(wrapper, leftOverInput, httpUpgradeHandler);
- this.upgradeServletInputStream = new
UpgradeServletInputStream(wrapper);
- this.upgradeServletOutputStream = new
UpgradeServletOutputStream(wrapper);
+ this.upgradeServletInputStream = new UpgradeServletInputStream(this,
wrapper);
+ this.upgradeServletOutputStream = new UpgradeServletOutputStream(this,
wrapper);
wrapper.unRead(leftOverInput);
/*
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java?rev=1709543&r1=1709542&r2=1709543&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java
(original)
+++
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java
Tue Oct 20 09:00:08 2015
@@ -22,6 +22,7 @@ import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import org.apache.coyote.ContainerThreadMarker;
+import org.apache.coyote.Processor;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.ExceptionUtils;
@@ -35,6 +36,7 @@ public class UpgradeServletInputStream e
private static final StringManager sm =
StringManager.getManager(UpgradeServletInputStream.class);
+ private final Processor processor;
private final SocketWrapperBase<?> socketWrapper;
private volatile boolean closed = false;
@@ -45,7 +47,8 @@ public class UpgradeServletInputStream e
private volatile ClassLoader applicationLoader = null;
- public UpgradeServletInputStream(SocketWrapperBase<?> socketWrapper) {
+ public UpgradeServletInputStream(Processor processor, SocketWrapperBase<?>
socketWrapper) {
+ this.processor = processor;
this.socketWrapper = socketWrapper;
}
@@ -101,7 +104,7 @@ public class UpgradeServletInputStream e
// Container is responsible for first call to onDataAvailable().
if (ContainerThreadMarker.isContainerThread()) {
- socketWrapper.addDispatch(DispatchType.NON_BLOCKING_READ);
+ processor.addDispatch(DispatchType.NON_BLOCKING_READ);
} else {
socketWrapper.registerReadInterest();
}
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java?rev=1709543&r1=1709542&r2=1709543&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
(original)
+++
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
Tue Oct 20 09:00:08 2015
@@ -22,6 +22,7 @@ import javax.servlet.ServletOutputStream
import javax.servlet.WriteListener;
import org.apache.coyote.ContainerThreadMarker;
+import org.apache.coyote.Processor;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.ExceptionUtils;
@@ -35,6 +36,7 @@ public class UpgradeServletOutputStream
private static final StringManager sm =
StringManager.getManager(UpgradeServletOutputStream.class);
+ private final Processor processor;
private final SocketWrapperBase<?> socketWrapper;
// Used to ensure that isReady() and onWritePossible() have a consistent
@@ -61,7 +63,8 @@ public class UpgradeServletOutputStream
private volatile ClassLoader applicationLoader = null;
- public UpgradeServletOutputStream(SocketWrapperBase<?> socketWrapper) {
+ public UpgradeServletOutputStream(Processor processor,
SocketWrapperBase<?> socketWrapper) {
+ this.processor = processor;
this.socketWrapper = socketWrapper;
}
@@ -115,7 +118,7 @@ public class UpgradeServletOutputStream
registered = true;
// Container is responsible for first call to onDataAvailable().
if (ContainerThreadMarker.isContainerThread()) {
- socketWrapper.addDispatch(DispatchType.NON_BLOCKING_WRITE);
+ processor.addDispatch(DispatchType.NON_BLOCKING_WRITE);
} else {
socketWrapper.registerWriteInterest();
}
Modified: tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java?rev=1709543&r1=1709542&r2=1709543&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java Tue Oct 20
09:00:08 2015
@@ -19,8 +19,6 @@ package org.apache.coyote.http2;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.http.HttpUpgradeHandler;
@@ -48,7 +46,6 @@ public class StreamProcessor extends Abs
private static final StringManager sm =
StringManager.getManager(StreamProcessor.class);
private final Stream stream;
- private Set<DispatchType> dispatches = new CopyOnWriteArraySet<>();
private volatile SSLSupport sslSupport;
@@ -374,11 +371,11 @@ public class StreamProcessor extends Abs
break;
}
case DISPATCH_READ: {
- dispatches.add(DispatchType.NON_BLOCKING_READ);
+ addDispatch(DispatchType.NON_BLOCKING_READ);
break;
}
case DISPATCH_WRITE: {
- dispatches.add(DispatchType.NON_BLOCKING_WRITE);
+ addDispatch(DispatchType.NON_BLOCKING_WRITE);
break;
}
case DISPATCH_EXECUTE: {
@@ -482,35 +479,6 @@ public class StreamProcessor extends Abs
}
- public void addDispatch(DispatchType dispatchType) {
- synchronized (dispatches) {
- dispatches.add(dispatchType);
- }
- }
- public Iterator<DispatchType> getIteratorAndClearDispatches() {
- // Note: Logic in AbstractProtocol depends on this method only
returning
- // a non-null value if the iterator is non-empty. i.e. it should never
- // return an empty iterator.
- Iterator<DispatchType> result;
- synchronized (dispatches) {
- // Synchronized as the generation of the iterator and the clearing
- // of dispatches needs to be an atomic operation.
- result = dispatches.iterator();
- if (result.hasNext()) {
- dispatches.clear();
- } else {
- result = null;
- }
- }
- return result;
- }
- public void clearDispatches() {
- synchronized (dispatches) {
- dispatches.clear();
- }
- }
-
-
@Override
public HttpUpgradeHandler getHttpUpgradeHandler() {
// Should never happen
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java?rev=1709543&r1=1709542&r2=1709543&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java Tue Oct
20 09:00:08 2015
@@ -811,7 +811,8 @@ public abstract class AbstractEndpoint<S
SocketStatus socketStatus, boolean dispatch);
- public void executeNonBlockingDispatches(SocketWrapperBase<S>
socketWrapper) {
+ public void executeNonBlockingDispatches(SocketWrapperBase<S>
socketWrapper,
+ Iterator<DispatchType> dispatches) {
/*
* This method is called when non-blocking IO is initiated by defining
* a read and/or write listener in a non-container thread. It is called
@@ -831,8 +832,6 @@ public abstract class AbstractEndpoint<S
* sure that the socket has been added to the waitingRequests queue.
*/
synchronized (socketWrapper) {
- Iterator<DispatchType> dispatches =
socketWrapper.getIteratorAndClearDispatches();
-
while (dispatches != null && dispatches.hasNext()) {
DispatchType dispatchType = dispatches.next();
processSocket(socketWrapper, dispatchType.getSocketStatus(),
false);
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1709543&r1=1709542&r2=1709543&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Tue Oct
20 09:00:08 2015
@@ -20,8 +20,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@@ -98,8 +96,6 @@ public abstract class SocketWrapperBase<
*/
protected int bufferedWriteSize = 64*1024; //64k default write buffer
- private Set<DispatchType> dispatches = new CopyOnWriteArraySet<>();
-
public SocketWrapperBase(E socket, AbstractEndpoint<E> endpoint) {
this.socket = socket;
this.endpoint = endpoint;
@@ -298,34 +294,6 @@ public abstract class SocketWrapperBase<
return socketBufferHandler.isWriteBufferWritable() &&
bufferedWrites.size() == 0;
}
- public void addDispatch(DispatchType dispatchType) {
- synchronized (dispatches) {
- dispatches.add(dispatchType);
- }
- }
- public Iterator<DispatchType> getIteratorAndClearDispatches() {
- // Note: Logic in AbstractProtocol depends on this method only
returning
- // a non-null value if the iterator is non-empty. i.e. it should never
- // return an empty iterator.
- Iterator<DispatchType> result;
- synchronized (dispatches) {
- // Synchronized as the generation of the iterator and the clearing
- // of dispatches needs to be an atomic operation.
- result = dispatches.iterator();
- if (result.hasNext()) {
- dispatches.clear();
- } else {
- result = null;
- }
- }
- return result;
- }
- public void clearDispatches() {
- synchronized (dispatches) {
- dispatches.clear();
- }
- }
-
/**
* Overridden for debug purposes. No guarantees are made about the format
of
@@ -587,8 +555,8 @@ public abstract class SocketWrapperBase<
}
- public void executeNonBlockingDispatches() {
- endpoint.executeNonBlockingDispatches(this);
+ public void executeNonBlockingDispatches(Iterator<DispatchType>
dispatches) {
+ endpoint.executeNonBlockingDispatches(this, dispatches);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]