Repository: mina Updated Branches: refs/heads/2.0 09b337521 -> bf0254f34
Replaced the synchronized selector, using a RW lock instead, to offer a safe synchronization (DIRMINA-1059) Project: http://git-wip-us.apache.org/repos/asf/mina/repo Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/bf0254f3 Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/bf0254f3 Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/bf0254f3 Branch: refs/heads/2.0 Commit: bf0254f34612af818f37305d76fc4f711be5de55 Parents: 09b3375 Author: Emmanuel Lécharny <elecha...@symas.com> Authored: Tue Dec 6 08:40:46 2016 +0100 Committer: Emmanuel Lécharny <elecha...@symas.com> Committed: Tue Dec 6 08:40:46 2016 +0100 ---------------------------------------------------------------------- .../mina/transport/socket/nio/NioProcessor.java | 90 ++++++++++++++++---- 1 file changed, 74 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina/blob/bf0254f3/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java ---------------------------------------------------------------------- diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java index 8202e18..3b0fa40 100644 --- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java +++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java @@ -30,6 +30,8 @@ import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.mina.core.RuntimeIoException; import org.apache.mina.core.buffer.IoBuffer; @@ -38,13 +40,16 @@ import org.apache.mina.core.polling.AbstractPollingIoProcessor; import org.apache.mina.core.session.SessionState; /** - * TODO Add documentation + * A processor for incoming and outgoing data get and written on a TCP socket. * * @author <a href="http://mina.apache.org">Apache MINA Project</a> */ public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> { /** The selector associated with this processor */ private Selector selector; + + /** A lock used to protect concurent access to the selector */ + private ReadWriteLock selectorLock = new ReentrantReadWriteLock(); private SelectorProvider selectorProvider = null; @@ -80,9 +85,9 @@ public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> { if (selectorProvider == null) { selector = Selector.open(); } else { + this.selectorProvider = selectorProvider; selector = selectorProvider.openSelector(); } - } catch (IOException e) { throw new RuntimeIoException("Failed to open a selector.", e); } @@ -90,33 +95,69 @@ public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> { @Override protected void doDispose() throws Exception { - selector.close(); + selectorLock.readLock().lock(); + + try { + selector.close(); + } finally { + selectorLock.readLock().unlock(); + } } @Override protected int select(long timeout) throws Exception { - return selector.select(timeout); + selectorLock.readLock().lock(); + + try { + return selector.select(timeout); + } finally { + selectorLock.readLock().unlock(); + } } @Override protected int select() throws Exception { - return selector.select(); + selectorLock.readLock().lock(); + + try { + return selector.select(); + } finally { + selectorLock.readLock().unlock(); + } } @Override protected boolean isSelectorEmpty() { - return selector.keys().isEmpty(); + selectorLock.readLock().lock(); + + try { + return selector.keys().isEmpty(); + } finally { + selectorLock.readLock().unlock(); + } } @Override protected void wakeup() { wakeupCalled.getAndSet(true); - selector.wakeup(); + selectorLock.readLock().lock(); + + try { + selector.wakeup(); + } finally { + selectorLock.readLock().unlock(); + } } @Override protected Iterator<NioSession> allSessions() { - return new IoSessionIterator(selector.keys()); + selectorLock.readLock().lock(); + + try { + return new IoSessionIterator(selector.keys()); + } finally { + selectorLock.readLock().unlock(); + } } @SuppressWarnings("synthetic-access") @@ -129,7 +170,13 @@ public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> { protected void init(NioSession session) throws Exception { SelectableChannel ch = (SelectableChannel) session.getChannel(); ch.configureBlocking(false); - session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session)); + selectorLock.readLock().lock(); + + try { + session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session)); + } finally { + selectorLock.readLock().unlock(); + } } @Override @@ -154,12 +201,13 @@ public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> { */ @Override protected void registerNewSelector() throws IOException { - synchronized (selector) { + selectorLock.writeLock().lock(); + + try { Set<SelectionKey> keys = selector.keys(); + Selector newSelector; // Open a new selector - Selector newSelector = null; - if (selectorProvider == null) { newSelector = Selector.open(); } else { @@ -179,7 +227,10 @@ public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> { // Now we can close the old selector and switch it selector.close(); selector = newSelector; + } finally { + selectorLock.writeLock().unlock(); } + } /** @@ -190,7 +241,9 @@ public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> { // A flag set to true if we find a broken session boolean brokenSession = false; - synchronized (selector) { + selectorLock.readLock().lock(); + + try { // Get the selector keys Set<SelectionKey> keys = selector.keys(); @@ -199,7 +252,7 @@ public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> { for (SelectionKey key : keys) { SelectableChannel channel = key.channel(); - if ((((channel instanceof DatagramChannel) && !((DatagramChannel) channel).isConnected())) + if (((channel instanceof DatagramChannel) && !((DatagramChannel) channel).isConnected()) || ((channel instanceof SocketChannel) && !((SocketChannel) channel).isConnected())) { // The channel is not connected anymore. Cancel // the associated key then. @@ -209,6 +262,8 @@ public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> { brokenSession = true; } } + } finally { + selectorLock.readLock().unlock(); } return brokenSession; @@ -368,6 +423,7 @@ public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> { /** * {@inheritDoc} */ + @Override public boolean hasNext() { return iterator.hasNext(); } @@ -375,15 +431,17 @@ public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> { /** * {@inheritDoc} */ + @Override public NioSession next() { SelectionKey key = iterator.next(); - NioSession nioSession = (NioSession) key.attachment(); - return nioSession; + + return (NioSession) key.attachment(); } /** * {@inheritDoc} */ + @Override public void remove() { iterator.remove(); }