This is an automated email from the ASF dual-hosted git repository.
elecharny pushed a commit to branch 2.2.X
in repository https://gitbox.apache.org/repos/asf/mina.git
The following commit(s) were added to refs/heads/2.2.X by this push:
new 36f4e9718 Applied patch for DIRMINA-1169
36f4e9718 is described below
commit 36f4e9718d43509d0182b153277ff186148660f3
Author: emmanuel lecharny <[email protected]>
AuthorDate: Sat May 20 06:42:18 2023 +0200
Applied patch for DIRMINA-1169
---
.../core/polling/AbstractPollingIoAcceptor.java | 96 ++++++++++++++--------
.../transport/socket/nio/NioSocketAcceptor.java | 14 +++-
.../transport/socket/nio/SocketAcceptorTest.java | 71 ++++++++++++++++
3 files changed, 144 insertions(+), 37 deletions(-)
diff --git
a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
index 8ca46a9e9..476fc2c91 100644
---
a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
+++
b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
@@ -22,6 +22,8 @@ package org.apache.mina.core.polling;
import java.net.SocketAddress;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -334,6 +336,13 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
startupAcceptor();
wakeup();
}
+
+ /**
+ * Invoked when a bind request has been registered for processing. The
default implementation does nothing.
+ */
+ protected void bindRequestAdded() {
+ // Nothing
+ }
/**
* {@inheritDoc}
@@ -347,7 +356,8 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
// adds the Registration request to the queue for the Workers
// to handle
registerQueue.add(request);
-
+ bindRequestAdded();
+
// creates the Acceptor instance and has the local
// executor kick it off.
startupAcceptor();
@@ -430,6 +440,52 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
}
}
+ /**
+ * Handles new incoming connections by accepting the connections and
creating new sessions for them.
+ *
+ * @param handles the connection handles to accept and create new
sessions for
+ * @throws Exception on errors
+ */
+ @SuppressWarnings("unchecked")
+ protected void processHandles(Iterator<H> handles) throws Exception {
+ while (handles.hasNext()) {
+ H handle = handles.next();
+ handles.remove();
+
+ // Associates a new created connection to a processor,
+ // and get back a session
+ S session = accept(processor, handle);
+
+ if (session == null) {
+ continue;
+ }
+
+ initSession(session, null, null);
+
+ // add the session to the SocketIoProcessor
+ session.getProcessor().add(session);
+ }
+ }
+
+ /**
+ * Tells whether there are pending unbindings.
+ *
+ * @return {@code true} if there are any unbindings pending; {@code false}
otherwise
+ */
+ protected boolean hasUnbindings() {
+ return !cancelQueue.isEmpty();
+ }
+
+ /**
+ * Processes the futures for executed unbindings, marking all futures as
done.
+ *
+ * @param unboundFutures describing the unbindings
+ * @throws Exception on errors
+ */
+ protected void handleUnbound(Collection<AcceptorOperationFuture>
unboundFutures) throws Exception {
+ unboundFutures.forEach(AcceptorOperationFuture::setDone);
+ }
+
/**
* This class is called by the startupAcceptor() method and is
* placed into a NamePreservingRunnable class.
@@ -491,7 +547,9 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
}
// check to see if any cancellation request has been made.
- nHandles -= unregisterHandles();
+ Collection<AcceptorOperationFuture> cancellations = new
ArrayList<>();
+ nHandles -= unregisterHandles(cancellations);
+ handleUnbound(cancellations);
} catch (ClosedSelectorException cse) {
// If the selector has been closed, we can exit the loop
ExceptionMonitor.getInstance().exceptionCaught(cse);
@@ -530,36 +588,6 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
}
}
- /**
- * This method will process new sessions for the Worker class. All
- * keys that have had their status updates as per the
Selector.selectedKeys()
- * method will be processed here. Only keys that are ready to accept
- * connections are handled here.
- * <p/>
- * Session objects are created by making new instances of
SocketSessionImpl
- * and passing the session object to the SocketIoProcessor class.
- */
- @SuppressWarnings("unchecked")
- private void processHandles(Iterator<H> handles) throws Exception {
- while (handles.hasNext()) {
- H handle = handles.next();
- handles.remove();
-
- // Associates a new created connection to a processor,
- // and get back a session
- S session = accept(processor, handle);
-
- if (session == null) {
- continue;
- }
-
- initSession(session, null, null);
-
- // add the session to the SocketIoProcessor
- session.getProcessor().add(session);
- }
- }
-
/**
* Sets up the socket communications. Sets items such as:
* <p/>
@@ -628,7 +656,7 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
* is CancellationRequest objects and the only place this happens is in
* the doUnbind() method.
*/
- private int unregisterHandles() {
+ private int unregisterHandles(Collection<AcceptorOperationFuture>
cancelled) {
int cancelledHandles = 0;
for (;;) {
AcceptorOperationFuture future = cancelQueue.poll();
@@ -654,7 +682,7 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
}
}
- future.setDone();
+ cancelled.add(future);
}
return cancelledHandles;
diff --git
a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
index 4881bb1e7..89253232e 100644
---
a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
+++
b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
@@ -148,6 +148,7 @@ implements SocketAcceptor {
/**
* {@inheritDoc}
*/
+ @Override
public TransportMetadata getTransportMetadata() {
return NioSocketSession.METADATA;
}
@@ -171,6 +172,7 @@ implements SocketAcceptor {
/**
* {@inheritDoc}
*/
+ @Override
public void setDefaultLocalAddress(InetSocketAddress localAddress) {
setDefaultLocalAddress((SocketAddress) localAddress);
}
@@ -180,7 +182,6 @@ implements SocketAcceptor {
*/
@Override
protected NioSession accept(IoProcessor<NioSession> processor,
ServerSocketChannel handle) throws Exception {
-
SelectionKey key = null;
if (handle != null) {
@@ -269,8 +270,12 @@ implements SocketAcceptor {
String newMessage = "Error while binding on " + localAddress;
Exception e = new IOException(newMessage, ioe);
- // And close the channel
- channel.close();
+ try {
+ // And close the channel
+ channel.close();
+ } catch (IOException nested) {
+ e.addSuppressed(nested);
+ }
throw e;
}
@@ -364,6 +369,7 @@ implements SocketAcceptor {
* @return <code>true</code> if there is at least one more
* SockectChannel object to read
*/
+ @Override
public boolean hasNext() {
return iterator.hasNext();
}
@@ -374,6 +380,7 @@ implements SocketAcceptor {
*
* @return The next SocketChannel in the iterator
*/
+ @Override
public ServerSocketChannel next() {
SelectionKey key = iterator.next();
@@ -387,6 +394,7 @@ implements SocketAcceptor {
/**
* Remove the current SocketChannel from the iterator
*/
+ @Override
public void remove() {
iterator.remove();
}
diff --git
a/mina-core/src/test/java/org/apache/mina/transport/socket/nio/SocketAcceptorTest.java
b/mina-core/src/test/java/org/apache/mina/transport/socket/nio/SocketAcceptorTest.java
new file mode 100644
index 000000000..3c09ccc87
--- /dev/null
+++
b/mina-core/src/test/java/org/apache/mina/transport/socket/nio/SocketAcceptorTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.util.AvailablePortFinder;
+import org.junit.Test;
+
+public class SocketAcceptorTest {
+
+ @Test
+ public void testBindTwice() throws Exception {
+ NioSocketAcceptor acceptor = new NioSocketAcceptor() {
+
+ private int nRequests;
+
+ private CountDownLatch secondRequestAdded = new CountDownLatch(1);
+
+ @Override
+ protected void bindRequestAdded() {
+ super.bindRequestAdded();
+ nRequests++;
+ if (nRequests == 2) {
+ secondRequestAdded.countDown();
+ }
+ }
+
+ @Override
+ protected void handleUnbound(Collection<AcceptorOperationFuture>
unboundFutures) throws Exception {
+ super.handleUnbound(unboundFutures);
+ if (!unboundFutures.isEmpty() && nRequests == 1) {
+ secondRequestAdded.await();
+ }
+ }
+ };
+ acceptor.setCloseOnDeactivation(false);
+ acceptor.setReuseAddress(true);
+ acceptor.setHandler(new IoHandlerAdapter());
+ try {
+ int port = AvailablePortFinder.getNextAvailable(1025);
+ InetSocketAddress address = new InetSocketAddress("127.0.0.1",
port);
+ acceptor.bind(address);
+ acceptor.unbind(address);
+ acceptor.bind(address);
+ acceptor.unbind(address);
+ } finally {
+ acceptor.dispose();
+ }
+ }
+}
\ No newline at end of file