Author: markt Date: Fri Feb 27 15:00:17 2015 New Revision: 1662694 URL: http://svn.apache.org/r1662694 Log: Checkpoint - switch WebSocket over to new UpgradeProcessorInternal
Added: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java (with props) Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/InternalHttpUpgradeHandler.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java tomcat/trunk/java/org/apache/tomcat/websocket/server/WsFrameServer.java tomcat/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java?rev=1662694&r1=1662693&r2=1662694&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java Fri Feb 27 15:00:17 2015 @@ -23,7 +23,9 @@ import javax.servlet.http.HttpUpgradeHan import org.apache.coyote.AbstractProtocol; import org.apache.coyote.Processor; +import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler; import org.apache.coyote.http11.upgrade.UpgradeProcessorExternal; +import org.apache.coyote.http11.upgrade.UpgradeProcessorInternal; import org.apache.tomcat.util.net.AbstractEndpoint; import org.apache.tomcat.util.net.SocketWrapperBase; @@ -283,7 +285,12 @@ public abstract class AbstractHttp11Prot SocketWrapperBase<?> socket, ByteBuffer leftoverInput, HttpUpgradeHandler httpUpgradeHandler) throws IOException { - return new UpgradeProcessorExternal(socket, leftoverInput, httpUpgradeHandler); + if (httpUpgradeHandler instanceof InternalHttpUpgradeHandler) { + return new UpgradeProcessorInternal(socket, leftoverInput, + (InternalHttpUpgradeHandler) httpUpgradeHandler); + } else { + return new UpgradeProcessorExternal(socket, leftoverInput, httpUpgradeHandler); + } } } } Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/InternalHttpUpgradeHandler.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/InternalHttpUpgradeHandler.java?rev=1662694&r1=1662693&r2=1662694&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/InternalHttpUpgradeHandler.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/InternalHttpUpgradeHandler.java Fri Feb 27 15:00:17 2015 @@ -20,6 +20,7 @@ import javax.servlet.http.HttpUpgradeHan import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; import org.apache.tomcat.util.net.SocketStatus; +import org.apache.tomcat.util.net.SocketWrapperBase; /** @@ -34,4 +35,6 @@ import org.apache.tomcat.util.net.Socket public interface InternalHttpUpgradeHandler extends HttpUpgradeHandler { SocketState upgradeDispatch(SocketStatus status); + + void setSocketWrapper(SocketWrapperBase<?> wrapper); } \ No newline at end of file Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java?rev=1662694&r1=1662693&r2=1662694&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java Fri Feb 27 15:00:17 2015 @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.Executor; import javax.servlet.http.HttpUpgradeHandler; +import javax.servlet.http.WebConnection; import org.apache.coyote.Processor; import org.apache.coyote.Request; @@ -29,7 +30,7 @@ import org.apache.tomcat.util.net.SSLSup import org.apache.tomcat.util.net.SocketStatus; import org.apache.tomcat.util.net.SocketWrapperBase; -public abstract class UpgradeProcessorBase implements Processor { +public abstract class UpgradeProcessorBase implements Processor, WebConnection { public UpgradeProcessorBase(SocketWrapperBase<?> wrapper, ByteBuffer leftOverInput) { wrapper.unRead(leftOverInput); Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java?rev=1662694&r1=1662693&r2=1662694&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java Fri Feb 27 15:00:17 2015 @@ -22,7 +22,6 @@ import java.nio.ByteBuffer; import javax.servlet.ServletInputStream; import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpUpgradeHandler; -import javax.servlet.http.WebConnection; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; @@ -31,7 +30,7 @@ import org.apache.tomcat.util.net.Socket import org.apache.tomcat.util.net.SocketWrapperBase; import org.apache.tomcat.util.res.StringManager; -public class UpgradeProcessorExternal extends UpgradeProcessorBase implements WebConnection { +public class UpgradeProcessorExternal extends UpgradeProcessorBase { private static final int INFINITE_TIMEOUT = -1; Added: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java?rev=1662694&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java (added) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java Fri Feb 27 15:00:17 2015 @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.coyote.http11.upgrade; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import javax.servlet.ServletInputStream; +import javax.servlet.ServletOutputStream; + +import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; +import org.apache.tomcat.util.net.SocketStatus; +import org.apache.tomcat.util.net.SocketWrapperBase; + +public class UpgradeProcessorInternal extends UpgradeProcessorBase { + + private final InternalHttpUpgradeHandler internalHttpUpgradeHandler; + + public UpgradeProcessorInternal(SocketWrapperBase<?> wrapper, ByteBuffer leftOverInput, + InternalHttpUpgradeHandler internalHttpUpgradeHandler) { + super(wrapper, leftOverInput); + this.internalHttpUpgradeHandler = internalHttpUpgradeHandler; + internalHttpUpgradeHandler.setSocketWrapper(wrapper); + } + + + @Override + public SocketState upgradeDispatch(SocketStatus status) { + return internalHttpUpgradeHandler.upgradeDispatch(status); + } + + + // --------------------------------------------------- AutoCloseable methods + + @Override + public void close() throws Exception { + internalHttpUpgradeHandler.destroy(); + } + + + // --------------------------------------------------- WebConnection methods + + @Override + public ServletInputStream getInputStream() throws IOException { + return null; + } + + @Override + public ServletOutputStream getOutputStream() throws IOException { + return null; + } +} Propchange: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsFrameServer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsFrameServer.java?rev=1662694&r1=1662693&r2=1662694&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsFrameServer.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsFrameServer.java Fri Feb 27 15:00:17 2015 @@ -18,22 +18,21 @@ package org.apache.tomcat.websocket.serv import java.io.IOException; -import javax.servlet.ServletInputStream; - +import org.apache.tomcat.util.net.SocketWrapperBase; import org.apache.tomcat.websocket.Transformation; import org.apache.tomcat.websocket.WsFrameBase; import org.apache.tomcat.websocket.WsSession; public class WsFrameServer extends WsFrameBase { - private final ServletInputStream sis; + private final SocketWrapperBase<?> socketWrapper; private final Object connectionReadLock = new Object(); - public WsFrameServer(ServletInputStream sis, WsSession wsSession, + public WsFrameServer(SocketWrapperBase<?> socketWrapper, WsSession wsSession, Transformation transformation) { super(wsSession, transformation); - this.sis = sis; + this.socketWrapper = socketWrapper; } @@ -45,10 +44,10 @@ public class WsFrameServer extends WsFra */ public void onDataAvailable() throws IOException { synchronized (connectionReadLock) { - while (isOpen() && sis.isReady()) { + while (isOpen() && socketWrapper.isReadyForRead()) { // Fill up the input buffer with as much data as we can - int read = sis.read( - inputBuffer, writePos, inputBuffer.length - writePos); + int read = socketWrapper.read( + false, inputBuffer, writePos, inputBuffer.length - writePos); if (read <= 0) { return; } Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java?rev=1662694&r1=1662693&r2=1662694&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java Fri Feb 27 15:00:17 2015 @@ -21,10 +21,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import javax.servlet.ReadListener; -import javax.servlet.ServletInputStream; -import javax.servlet.ServletOutputStream; -import javax.servlet.WriteListener; import javax.servlet.http.HttpSession; import javax.servlet.http.WebConnection; import javax.websocket.CloseReason; @@ -39,6 +35,7 @@ import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; import org.apache.tomcat.util.net.SocketStatus; +import org.apache.tomcat.util.net.SocketWrapperBase; import org.apache.tomcat.util.res.StringManager; import org.apache.tomcat.websocket.Transformation; import org.apache.tomcat.websocket.WsIOException; @@ -56,6 +53,8 @@ public class WsHttpUpgradeHandler implem private final ClassLoader applicationClassLoader; + private SocketWrapperBase<?> socketWrapper; + private Endpoint ep; private EndpointConfig endpointConfig; private WsServerContainer webSocketContainer; @@ -67,6 +66,8 @@ public class WsHttpUpgradeHandler implem private boolean secure; private WebConnection connection; + private WsRemoteEndpointImplServer wsRemoteEndpointServer; + private WsFrameServer wsFrame; private WsSession wsSession; @@ -75,6 +76,12 @@ public class WsHttpUpgradeHandler implem } + @Override + public void setSocketWrapper(SocketWrapperBase<?> socketWrapper) { + this.socketWrapper = socketWrapper; + } + + public void preInit(Endpoint ep, EndpointConfig endpointConfig, WsServerContainer wsc, WsHandshakeRequest handshakeRequest, List<Extension> negotiatedExtensionsPhase2, String subProtocol, @@ -99,17 +106,6 @@ public class WsHttpUpgradeHandler implem sm.getString("wsHttpUpgradeHandler.noPreInit")); } - this.connection = connection; - - ServletInputStream sis; - ServletOutputStream sos; - try { - sis = connection.getInputStream(); - sos = connection.getOutputStream(); - } catch (IOException e) { - throw new IllegalStateException(e); - } - String httpSessionId = null; Object session = handshakeRequest.getHttpSession(); if (session != null ) { @@ -123,8 +119,7 @@ public class WsHttpUpgradeHandler implem ClassLoader cl = t.getContextClassLoader(); t.setContextClassLoader(applicationClassLoader); try { - WsRemoteEndpointImplServer wsRemoteEndpointServer = - new WsRemoteEndpointImplServer(sis, sos, webSocketContainer); + wsRemoteEndpointServer = new WsRemoteEndpointImplServer(socketWrapper, webSocketContainer); wsSession = new WsSession(ep, wsRemoteEndpointServer, webSocketContainer, handshakeRequest.getRequestURI(), handshakeRequest.getParameterMap(), @@ -132,18 +127,12 @@ public class WsHttpUpgradeHandler implem handshakeRequest.getUserPrincipal(), httpSessionId, negotiatedExtensions, subProtocol, pathParameters, secure, endpointConfig); - WsFrameServer wsFrame = new WsFrameServer(sis, wsSession, transformation); - sos.setWriteListener(new WsWriteListener(this, wsRemoteEndpointServer)); + wsFrame = new WsFrameServer(socketWrapper, wsSession, transformation); // WsFrame adds the necessary final transformations. Copy the // completed transformation chain to the remote end point. wsRemoteEndpointServer.setTransformation(wsFrame.getTransformation()); ep.onOpen(wsSession, endpointConfig); webSocketContainer.registerSession(ep, wsSession); - try { - sis.setReadListener(new WsReadListener(this, wsFrame)); - } catch (IllegalStateException e) { - // It is not impossible that the stream is already closed during onOpen - } } catch (DeploymentException e) { throw new IllegalArgumentException(e); } finally { @@ -154,8 +143,48 @@ public class WsHttpUpgradeHandler implem @Override public SocketState upgradeDispatch(SocketStatus status) { - // TODO Auto-generated method stub - return null; + switch (status) { + case OPEN_READ: + try { + wsFrame.onDataAvailable(); + } catch (WsIOException ws) { + close(ws.getCloseReason()); + } catch (EOFException eof) { + CloseReason cr = new CloseReason( + CloseCodes.CLOSED_ABNORMALLY, eof.getMessage()); + close(cr); + } catch (IOException ioe) { + onError(ioe); + CloseReason cr = new CloseReason( + CloseCodes.CLOSED_ABNORMALLY, ioe.getMessage()); + close(cr); + } + break; + case OPEN_WRITE: + wsRemoteEndpointServer.onWritePossible(false); + break; + case STOP: + // TODO i18n + CloseReason cr = new CloseReason(CloseCodes.GOING_AWAY, ""); + try { + wsSession.close(cr); + } catch (IOException ioe) { + onError(ioe); + cr = new CloseReason( + CloseCodes.CLOSED_ABNORMALLY, ioe.getMessage()); + close(cr); + } + break; + case ASYNC_READ_ERROR: + case ASYNC_WRITE_ERROR: + case CLOSE_NOW: + case DISCONNECT: + case ERROR: + case TIMEOUT: + return SocketState.CLOSED; + + } + return SocketState.UPGRADED; } @@ -195,77 +224,4 @@ public class WsHttpUpgradeHandler implem */ wsSession.onClose(cr); } - - - private static class WsReadListener implements ReadListener { - - private final WsHttpUpgradeHandler wsProtocolHandler; - private final WsFrameServer wsFrame; - - - private WsReadListener(WsHttpUpgradeHandler wsProtocolHandler, - WsFrameServer wsFrame) { - this.wsProtocolHandler = wsProtocolHandler; - this.wsFrame = wsFrame; - } - - - @Override - public void onDataAvailable() { - try { - wsFrame.onDataAvailable(); - } catch (WsIOException ws) { - wsProtocolHandler.close(ws.getCloseReason()); - } catch (EOFException eof) { - CloseReason cr = new CloseReason( - CloseCodes.CLOSED_ABNORMALLY, eof.getMessage()); - wsProtocolHandler.close(cr); - } catch (IOException ioe) { - onError(ioe); - CloseReason cr = new CloseReason( - CloseCodes.CLOSED_ABNORMALLY, ioe.getMessage()); - wsProtocolHandler.close(cr); - } - } - - - @Override - public void onAllDataRead() { - // NO-OP - } - - - @Override - public void onError(Throwable throwable) { - wsProtocolHandler.onError(throwable); - } - } - - - private static class WsWriteListener implements WriteListener { - - private final WsHttpUpgradeHandler wsProtocolHandler; - private final WsRemoteEndpointImplServer wsRemoteEndpointServer; - - private WsWriteListener(WsHttpUpgradeHandler wsProtocolHandler, - WsRemoteEndpointImplServer wsRemoteEndpointServer) { - this.wsProtocolHandler = wsProtocolHandler; - this.wsRemoteEndpointServer = wsRemoteEndpointServer; - } - - - @Override - public void onWritePossible() { - // Triggered by the poller so this isn't the same thread that - // triggered the write so no need for a dispatch - wsRemoteEndpointServer.onWritePossible(false); - } - - - @Override - public void onError(Throwable throwable) { - wsProtocolHandler.onError(throwable); - wsRemoteEndpointServer.close(); - } - } } Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java?rev=1662694&r1=1662693&r2=1662694&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java Fri Feb 27 15:00:17 2015 @@ -24,21 +24,19 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; -import javax.servlet.ServletInputStream; -import javax.servlet.ServletOutputStream; import javax.websocket.SendHandler; import javax.websocket.SendResult; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; +import org.apache.tomcat.util.net.SocketWrapperBase; import org.apache.tomcat.util.res.StringManager; import org.apache.tomcat.websocket.Transformation; import org.apache.tomcat.websocket.WsRemoteEndpointImplBase; /** * This is the server side {@link javax.websocket.RemoteEndpoint} implementation - * - i.e. what the server uses to send data to the client. Communication is over - * a {@link ServletOutputStream}. + * - i.e. what the server uses to send data to the client. */ public class WsRemoteEndpointImplServer extends WsRemoteEndpointImplBase { @@ -50,8 +48,7 @@ public class WsRemoteEndpointImplServer private static final Queue<OnResultRunnable> onResultRunnables = new ConcurrentLinkedQueue<>(); - private final ServletInputStream sis; - private final ServletOutputStream sos; + private final SocketWrapperBase<?> socketWrapper; private final WsWriteTimeout wsWriteTimeout; private final ExecutorService executorService; private volatile SendHandler handler = null; @@ -60,10 +57,9 @@ public class WsRemoteEndpointImplServer private volatile long timeoutExpiry = -1; private volatile boolean close; - public WsRemoteEndpointImplServer(ServletInputStream sis, ServletOutputStream sos, + public WsRemoteEndpointImplServer(SocketWrapperBase<?> socketWrapper, WsServerContainer serverContainer) { - this.sis = sis; - this.sos = sos; + this.socketWrapper = socketWrapper; this.wsWriteTimeout = serverContainer.getTimeout(); this.executorService = serverContainer.getExecutorService(); } @@ -95,20 +91,20 @@ public class WsRemoteEndpointImplServer boolean complete = false; try { // If this is false there will be a call back when it is true - while (sos.isReady()) { + while (socketWrapper.isReadyForWrite()) { complete = true; for (ByteBuffer buffer : buffers) { if (buffer.hasRemaining()) { complete = false; - sos.write(buffer.array(), buffer.arrayOffset(), - buffer.limit()); + socketWrapper.write( + false, buffer.array(), buffer.arrayOffset(), buffer.limit()); buffer.position(buffer.limit()); break; } } if (complete) { - sos.flush(); - complete = sos.isReady(); + socketWrapper.flush(false); + complete = socketWrapper.isReadyForWrite(); if (complete) { wsWriteTimeout.unregister(this); clearHandler(null, useDispatch); @@ -147,14 +143,7 @@ public class WsRemoteEndpointImplServer clearHandler(new EOFException(), true); } try { - sos.close(); - } catch (IOException e) { - if (log.isInfoEnabled()) { - log.info(sm.getString("wsRemoteEndpointServer.closeFailed"), e); - } - } - try { - sis.close(); + socketWrapper.close(); } catch (IOException e) { if (log.isInfoEnabled()) { log.info(sm.getString("wsRemoteEndpointServer.closeFailed"), e); --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org