This is an automated email from the ASF dual-hosted git repository. elecharny pushed a commit to branch 2.1.X in repository https://gitbox.apache.org/repos/asf/mina.git
The following commit(s) were added to refs/heads/2.1.X by this push: new 6ab1017cd Backported patch for DIRMINA-1169 6ab1017cd is described below commit 6ab1017cd0e50d125d4e6450cf42d48349126c84 Author: emmanuel lecharny <elecha...@apache.org> AuthorDate: Mon May 22 09:24:55 2023 +0200 Backported patch for DIRMINA-1169 --- .../core/polling/AbstractPollingIoAcceptor.java | 98 ++++++++++++++-------- .../transport/socket/nio/NioSocketAcceptor.java | 42 +++++++++- .../transport/socket/nio/SocketAcceptorTest.java | 71 ++++++++++++++++ 3 files changed, 175 insertions(+), 36 deletions(-) diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java index 8ca46a9e9..3fc26993f 100644 --- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java +++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java @@ -22,6 +22,8 @@ package org.apache.mina.core.polling; import java.net.SocketAddress; import java.nio.channels.ClosedSelectorException; import java.nio.channels.spi.SelectorProvider; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -335,6 +337,13 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> wakeup(); } + /** + * Invoked when a bind request has been registered for processing. The default implementation does nothing. + */ + protected void bindRequestAdded() { + // Nothing + } + /** * {@inheritDoc} */ @@ -347,6 +356,7 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> // adds the Registration request to the queue for the Workers // to handle registerQueue.add(request); + bindRequestAdded(); // creates the Acceptor instance and has the local // executor kick it off. @@ -429,6 +439,54 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> throw future.getException(); } } + + /** + * Handles new incoming connections by accepting the connections and creating new sessions for them. + * + * @param handles the connection handles to accept and create new sessions for + * @throws Exception on errors + */ + @SuppressWarnings("unchecked") + protected void processHandles(Iterator<H> handles) throws Exception { + while (handles.hasNext()) { + H handle = handles.next(); + handles.remove(); + + // Associates a new created connection to a processor, + // and get back a session + S session = accept(processor, handle); + + if (session == null) { + continue; + } + + initSession(session, null, null); + + // add the session to the SocketIoProcessor + session.getProcessor().add(session); + } + } + + /** + * Tells whether there are pending unbindings. + * + * @return {@code true} if there are any unbindings pending; {@code false} otherwise + */ + protected boolean hasUnbindings() { + return !cancelQueue.isEmpty(); + } + + /** + * Processes the futures for executed unbindings, marking all futures as done. + * + * @param unboundFutures describing the unbindings + * @throws Exception on errors + */ + protected void handleUnbound(Collection<AcceptorOperationFuture> unboundFutures) throws Exception { + for (AcceptorOperationFuture unboundFuture:unboundFutures) { + unboundFuture.setDone(); + } + } /** * This class is called by the startupAcceptor() method and is @@ -489,9 +547,11 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> // them here. processHandles(selectedHandles()); } - + // check to see if any cancellation request has been made. - nHandles -= unregisterHandles(); + Collection<AcceptorOperationFuture> cancellations = new ArrayList<>(); + nHandles -= unregisterHandles(cancellations); + handleUnbound(cancellations); } catch (ClosedSelectorException cse) { // If the selector has been closed, we can exit the loop ExceptionMonitor.getInstance().exceptionCaught(cse); @@ -530,36 +590,6 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> } } - /** - * This method will process new sessions for the Worker class. All - * keys that have had their status updates as per the Selector.selectedKeys() - * method will be processed here. Only keys that are ready to accept - * connections are handled here. - * <p/> - * Session objects are created by making new instances of SocketSessionImpl - * and passing the session object to the SocketIoProcessor class. - */ - @SuppressWarnings("unchecked") - private void processHandles(Iterator<H> handles) throws Exception { - while (handles.hasNext()) { - H handle = handles.next(); - handles.remove(); - - // Associates a new created connection to a processor, - // and get back a session - S session = accept(processor, handle); - - if (session == null) { - continue; - } - - initSession(session, null, null); - - // add the session to the SocketIoProcessor - session.getProcessor().add(session); - } - } - /** * Sets up the socket communications. Sets items such as: * <p/> @@ -628,7 +658,7 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> * is CancellationRequest objects and the only place this happens is in * the doUnbind() method. */ - private int unregisterHandles() { + private int unregisterHandles(Collection<AcceptorOperationFuture> cancelled) { int cancelledHandles = 0; for (;;) { AcceptorOperationFuture future = cancelQueue.poll(); @@ -654,7 +684,7 @@ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> } } - future.setDone(); + cancelled.add(future); } return cancelledHandles; diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java index fc672b09d..836326609 100644 --- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java +++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java @@ -135,6 +135,35 @@ implements SocketAcceptor { } } + /** + * {@inheritDoc} + */ + @Override + protected void handleUnbound(Collection<AcceptorOperationFuture> unboundFutures) throws Exception { + // If we're on Java >= 11, unbindings may take effect only on the next select() + // TODO: add a check (java.specification.version?) to do this only on a JVM >= 11? + if (!unboundFutures.isEmpty()) { + int selected = 0; + try { + // Simply select() would also work since wakeup() *was* called, but let's be explicit. + selected = selector.selectNow(); + } finally { + super.handleUnbound(unboundFutures); // Marks the futures as done + if (hasUnbindings()) { + // Depending on when these new unbindings were added, their wakeup() call may just have been + // cancelled by the above select. Re-instate it, so that the next select will not block, as + // expected. + wakeup(); + } + } + if (selected > 0) { + processHandles(selectedHandles()); + } + } else { + super.handleUnbound(unboundFutures); + } + } + /** * {@inheritDoc} */ @@ -148,6 +177,7 @@ implements SocketAcceptor { /** * {@inheritDoc} */ + @Override public TransportMetadata getTransportMetadata() { return NioSocketSession.METADATA; } @@ -171,6 +201,7 @@ implements SocketAcceptor { /** * {@inheritDoc} */ + @Override public void setDefaultLocalAddress(InetSocketAddress localAddress) { setDefaultLocalAddress((SocketAddress) localAddress); } @@ -269,8 +300,12 @@ implements SocketAcceptor { String newMessage = "Error while binding on " + localAddress; Exception e = new IOException(newMessage, ioe); - // And close the channel - channel.close(); + try { + // And close the channel + channel.close(); + } catch (IOException nested) { + e.addSuppressed(nested); + } throw e; } @@ -364,6 +399,7 @@ implements SocketAcceptor { * @return <tt>true</tt> if there is at least one more * SockectChannel object to read */ + @Override public boolean hasNext() { return iterator.hasNext(); } @@ -374,6 +410,7 @@ implements SocketAcceptor { * * @return The next SocketChannel in the iterator */ + @Override public ServerSocketChannel next() { SelectionKey key = iterator.next(); @@ -387,6 +424,7 @@ implements SocketAcceptor { /** * Remove the current SocketChannel from the iterator */ + @Override public void remove() { iterator.remove(); } diff --git a/mina-core/src/test/java/org/apache/mina/transport/socket/nio/SocketAcceptorTest.java b/mina-core/src/test/java/org/apache/mina/transport/socket/nio/SocketAcceptorTest.java new file mode 100644 index 000000000..3c09ccc87 --- /dev/null +++ b/mina-core/src/test/java/org/apache/mina/transport/socket/nio/SocketAcceptorTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; + +import org.apache.mina.core.service.IoHandlerAdapter; +import org.apache.mina.util.AvailablePortFinder; +import org.junit.Test; + +public class SocketAcceptorTest { + + @Test + public void testBindTwice() throws Exception { + NioSocketAcceptor acceptor = new NioSocketAcceptor() { + + private int nRequests; + + private CountDownLatch secondRequestAdded = new CountDownLatch(1); + + @Override + protected void bindRequestAdded() { + super.bindRequestAdded(); + nRequests++; + if (nRequests == 2) { + secondRequestAdded.countDown(); + } + } + + @Override + protected void handleUnbound(Collection<AcceptorOperationFuture> unboundFutures) throws Exception { + super.handleUnbound(unboundFutures); + if (!unboundFutures.isEmpty() && nRequests == 1) { + secondRequestAdded.await(); + } + } + }; + acceptor.setCloseOnDeactivation(false); + acceptor.setReuseAddress(true); + acceptor.setHandler(new IoHandlerAdapter()); + try { + int port = AvailablePortFinder.getNextAvailable(1025); + InetSocketAddress address = new InetSocketAddress("127.0.0.1", port); + acceptor.bind(address); + acceptor.unbind(address); + acceptor.bind(address); + acceptor.unbind(address); + } finally { + acceptor.dispose(); + } + } +} \ No newline at end of file