This is an automated email from the ASF dual-hosted git repository.

elecharny pushed a commit to branch 2.1.X
in repository https://gitbox.apache.org/repos/asf/mina.git


The following commit(s) were added to refs/heads/2.1.X by this push:
     new 6ab1017cd Backported patch for DIRMINA-1169
6ab1017cd is described below

commit 6ab1017cd0e50d125d4e6450cf42d48349126c84
Author: emmanuel lecharny <elecha...@apache.org>
AuthorDate: Mon May 22 09:24:55 2023 +0200

    Backported patch for DIRMINA-1169
---
 .../core/polling/AbstractPollingIoAcceptor.java    | 98 ++++++++++++++--------
 .../transport/socket/nio/NioSocketAcceptor.java    | 42 +++++++++-
 .../transport/socket/nio/SocketAcceptorTest.java   | 71 ++++++++++++++++
 3 files changed, 175 insertions(+), 36 deletions(-)

diff --git 
a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
 
b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
index 8ca46a9e9..3fc26993f 100644
--- 
a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
+++ 
b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
@@ -22,6 +22,8 @@ package org.apache.mina.core.polling;
 import java.net.SocketAddress;
 import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -335,6 +337,13 @@ public abstract class AbstractPollingIoAcceptor<S extends 
AbstractIoSession, H>
         wakeup();
     }
 
+    /**
+     * Invoked when a bind request has been registered for processing. The 
default implementation does nothing.
+     */
+    protected void bindRequestAdded() {
+        // Nothing
+    }
+
     /**
      * {@inheritDoc}
      */
@@ -347,6 +356,7 @@ public abstract class AbstractPollingIoAcceptor<S extends 
AbstractIoSession, H>
         // adds the Registration request to the queue for the Workers
         // to handle
         registerQueue.add(request);
+        bindRequestAdded();
 
         // creates the Acceptor instance and has the local
         // executor kick it off.
@@ -429,6 +439,54 @@ public abstract class AbstractPollingIoAcceptor<S extends 
AbstractIoSession, H>
             throw future.getException();
         }
     }
+    
+    /**
+     * Handles new incoming connections by accepting the connections and 
creating new sessions for them.
+     *
+     * @param  handles   the connection handles to accept and create new 
sessions for
+     * @throws Exception on errors
+     */
+    @SuppressWarnings("unchecked")
+    protected void processHandles(Iterator<H> handles) throws Exception {
+        while (handles.hasNext()) {
+            H handle = handles.next();
+            handles.remove();
+
+            // Associates a new created connection to a processor,
+            // and get back a session
+            S session = accept(processor, handle);
+
+            if (session == null) {
+                continue;
+            }
+
+            initSession(session, null, null);
+
+            // add the session to the SocketIoProcessor
+            session.getProcessor().add(session);
+        }
+    }
+
+    /**
+     * Tells whether there are pending unbindings.
+     *
+     * @return {@code true} if there are any unbindings pending; {@code false} 
otherwise
+     */
+    protected boolean hasUnbindings() {
+        return !cancelQueue.isEmpty();
+    }
+
+    /**
+     * Processes the futures for executed unbindings, marking all futures as 
done.
+     *
+     * @param  unboundFutures describing the unbindings
+     * @throws Exception      on errors
+     */
+    protected void handleUnbound(Collection<AcceptorOperationFuture> 
unboundFutures) throws Exception {
+        for (AcceptorOperationFuture unboundFuture:unboundFutures) {
+            unboundFuture.setDone();
+        }
+    }
 
     /**
      * This class is called by the startupAcceptor() method and is
@@ -489,9 +547,11 @@ public abstract class AbstractPollingIoAcceptor<S extends 
AbstractIoSession, H>
                         // them here.
                         processHandles(selectedHandles());
                     }
-
+                    
                     // check to see if any cancellation request has been made.
-                    nHandles -= unregisterHandles();
+                    Collection<AcceptorOperationFuture> cancellations = new 
ArrayList<>();
+                    nHandles -= unregisterHandles(cancellations);
+                    handleUnbound(cancellations);
                 } catch (ClosedSelectorException cse) {
                     // If the selector has been closed, we can exit the loop
                     ExceptionMonitor.getInstance().exceptionCaught(cse);
@@ -530,36 +590,6 @@ public abstract class AbstractPollingIoAcceptor<S extends 
AbstractIoSession, H>
             }
         }
 
-        /**
-         * This method will process new sessions for the Worker class.  All
-         * keys that have had their status updates as per the 
Selector.selectedKeys()
-         * method will be processed here.  Only keys that are ready to accept
-         * connections are handled here.
-         * <p/>
-         * Session objects are created by making new instances of 
SocketSessionImpl
-         * and passing the session object to the SocketIoProcessor class.
-         */
-        @SuppressWarnings("unchecked")
-        private void processHandles(Iterator<H> handles) throws Exception {
-            while (handles.hasNext()) {
-                H handle = handles.next();
-                handles.remove();
-
-                // Associates a new created connection to a processor,
-                // and get back a session
-                S session = accept(processor, handle);
-
-                if (session == null) {
-                    continue;
-                }
-
-                initSession(session, null, null);
-
-                // add the session to the SocketIoProcessor
-                session.getProcessor().add(session);
-            }
-        }
-
         /**
          * Sets up the socket communications.  Sets items such as:
          * <p/>
@@ -628,7 +658,7 @@ public abstract class AbstractPollingIoAcceptor<S extends 
AbstractIoSession, H>
          * is CancellationRequest objects and the only place this happens is in
          * the doUnbind() method.
          */
-        private int unregisterHandles() {
+        private int unregisterHandles(Collection<AcceptorOperationFuture> 
cancelled) {
             int cancelledHandles = 0;
             for (;;) {
                 AcceptorOperationFuture future = cancelQueue.poll();
@@ -654,7 +684,7 @@ public abstract class AbstractPollingIoAcceptor<S extends 
AbstractIoSession, H>
                     }
                 }
 
-                future.setDone();
+                cancelled.add(future);
             }
 
             return cancelledHandles;
diff --git 
a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
 
b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
index fc672b09d..836326609 100644
--- 
a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
+++ 
b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
@@ -135,6 +135,35 @@ implements SocketAcceptor {
         }
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected void handleUnbound(Collection<AcceptorOperationFuture> 
unboundFutures) throws Exception {
+        // If we're on Java >= 11, unbindings may take effect only on the next 
select()
+        // TODO: add a check (java.specification.version?) to do this only on 
a JVM >= 11?
+        if (!unboundFutures.isEmpty()) {
+            int selected = 0;
+            try {
+                // Simply select() would also work since wakeup() *was* 
called, but let's be explicit.
+                selected = selector.selectNow();
+            } finally {
+                super.handleUnbound(unboundFutures); // Marks the futures as 
done
+                if (hasUnbindings()) {
+                    // Depending on when these new unbindings were added, 
their wakeup() call may just have been
+                    // cancelled by the above select. Re-instate it, so that 
the next select will not block, as
+                    // expected.
+                    wakeup();
+                }
+            }
+            if (selected > 0) {
+                processHandles(selectedHandles());
+            }
+        } else {
+            super.handleUnbound(unboundFutures);
+        }
+    }
+
     /**
      * {@inheritDoc}
      */
@@ -148,6 +177,7 @@ implements SocketAcceptor {
     /**
      * {@inheritDoc}
      */
+    @Override
     public TransportMetadata getTransportMetadata() {
         return NioSocketSession.METADATA;
     }
@@ -171,6 +201,7 @@ implements SocketAcceptor {
     /**
      * {@inheritDoc}
      */
+    @Override
     public void setDefaultLocalAddress(InetSocketAddress localAddress) {
         setDefaultLocalAddress((SocketAddress) localAddress);
     }
@@ -269,8 +300,12 @@ implements SocketAcceptor {
                 String newMessage = "Error while binding on " + localAddress;
                 Exception e = new IOException(newMessage, ioe);
 
-                // And close the channel
-                channel.close();
+                try {
+                    // And close the channel
+                    channel.close();
+                } catch (IOException nested) {
+                    e.addSuppressed(nested);
+                }
 
                 throw e;
             }
@@ -364,6 +399,7 @@ implements SocketAcceptor {
          * @return <tt>true</tt> if there is at least one more
          * SockectChannel object to read
          */
+        @Override
         public boolean hasNext() {
             return iterator.hasNext();
         }
@@ -374,6 +410,7 @@ implements SocketAcceptor {
          * 
          * @return The next SocketChannel in the iterator
          */
+        @Override
         public ServerSocketChannel next() {
             SelectionKey key = iterator.next();
 
@@ -387,6 +424,7 @@ implements SocketAcceptor {
         /**
          * Remove the current SocketChannel from the iterator
          */
+        @Override
         public void remove() {
             iterator.remove();
         }
diff --git 
a/mina-core/src/test/java/org/apache/mina/transport/socket/nio/SocketAcceptorTest.java
 
b/mina-core/src/test/java/org/apache/mina/transport/socket/nio/SocketAcceptorTest.java
new file mode 100644
index 000000000..3c09ccc87
--- /dev/null
+++ 
b/mina-core/src/test/java/org/apache/mina/transport/socket/nio/SocketAcceptorTest.java
@@ -0,0 +1,71 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.util.AvailablePortFinder;
+import org.junit.Test;
+
+public class SocketAcceptorTest {
+
+    @Test
+    public void testBindTwice() throws Exception {
+        NioSocketAcceptor acceptor = new NioSocketAcceptor() {
+
+            private int nRequests;
+
+            private CountDownLatch secondRequestAdded = new CountDownLatch(1);
+
+            @Override
+            protected void bindRequestAdded() {
+                super.bindRequestAdded();
+                nRequests++;
+                if (nRequests == 2) {
+                    secondRequestAdded.countDown();
+                }
+            }
+
+            @Override
+            protected void handleUnbound(Collection<AcceptorOperationFuture> 
unboundFutures) throws Exception {
+                super.handleUnbound(unboundFutures);
+                if (!unboundFutures.isEmpty() && nRequests == 1) {
+                    secondRequestAdded.await();
+                }
+            }
+        };
+        acceptor.setCloseOnDeactivation(false);
+        acceptor.setReuseAddress(true);
+        acceptor.setHandler(new IoHandlerAdapter());
+        try {
+            int port = AvailablePortFinder.getNextAvailable(1025);
+            InetSocketAddress address = new InetSocketAddress("127.0.0.1", 
port);
+            acceptor.bind(address);
+            acceptor.unbind(address);
+            acceptor.bind(address);
+            acceptor.unbind(address);
+        } finally {
+            acceptor.dispose();
+        }
+    }
+}
\ No newline at end of file

Reply via email to