This is an automated email from the ASF dual-hosted git repository. tibordigana pushed a commit to branch maven2surefire-jvm-communication in repository https://gitbox.apache.org/repos/asf/maven-surefire.git
The following commit(s) were added to refs/heads/maven2surefire-jvm-communication by this push: new e0f5f08 fix after Enrico's findings in external project e0f5f08 is described below commit e0f5f081f216872bb56a7ee14e5cdc29a6209101 Author: tibordigana <tibordig...@apache.org> AuthorDate: Sun Mar 22 01:47:34 2020 +0100 fix after Enrico's findings in external project --- .../plugin/surefire/booterclient/ForkStarter.java | 4 +- .../AbstractNoninterruptibleReadableChannel.java | 69 ++++++ .../AbstractNoninterruptibleWritableChannel.java | 93 ++++++++ .../maven/surefire/util/internal/Channels.java | 122 ++++++++++ .../java/org/apache/maven/JUnit4SuiteTest.java | 6 +- .../surefire/util/internal/ChannelsReaderTest.java | 250 +++++++++++++++++++++ .../surefire/util/internal/ChannelsWriterTest.java | 171 ++++++++++++++ .../apache/maven/surefire/booter/ForkedBooter.java | 3 + .../spi/LegacyMasterProcessChannelEncoder.java | 36 +-- ...LegacyMasterProcessChannelProcessorFactory.java | 5 +- .../extensions/util/CommandlineStreams.java | 6 +- .../util/FlushableWritableByteChannel.java | 68 ------ 12 files changed, 742 insertions(+), 91 deletions(-) diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java index a41384a..b27dacb 100644 --- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java +++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java @@ -637,9 +637,9 @@ public class ForkStarter out = forkChannel.bindEventHandler( eventConsumer, countdownCloseable, streams.getStdOutChannel() ); out.start(); - EventHandler<String> stdErrConsumer = new NativeStdErrStreamConsumer( reporter ); + EventHandler<String> errConsumer = new NativeStdErrStreamConsumer( reporter ); err = new LineConsumerThread( "fork-" + forkNumber + "-err-thread-", streams.getStdErrChannel(), - stdErrConsumer, countdownCloseable ); + errConsumer, countdownCloseable ); err.start(); result = exec.awaitExit(); diff --git a/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/AbstractNoninterruptibleReadableChannel.java b/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/AbstractNoninterruptibleReadableChannel.java new file mode 100644 index 0000000..1a75972 --- /dev/null +++ b/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/AbstractNoninterruptibleReadableChannel.java @@ -0,0 +1,69 @@ +package org.apache.maven.surefire.util.internal; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.NonReadableChannelException; +import java.nio.channels.ReadableByteChannel; + +/** + * The channel used for reads which cannot be implicitly closed after the operational Thread + * is {@link Thread#isInterrupted() interrupted}. + * + * @since 3.0.0-M5 + */ +abstract class AbstractNoninterruptibleReadableChannel implements ReadableByteChannel +{ + private volatile boolean open = true; + + protected abstract int readImpl( ByteBuffer src ) throws IOException; + protected abstract void closeImpl() throws IOException; + + @Override + public final int read( ByteBuffer src ) throws IOException + { + if ( !isOpen() ) + { + throw new ClosedChannelException(); + } + + if ( !src.hasArray() || src.isReadOnly() ) + { + throw new NonReadableChannelException(); + } + + return src.hasRemaining() ? readImpl( src ) : 0; + } + + @Override + public final boolean isOpen() + { + return open; + } + + @Override + public final void close() throws IOException + { + open = false; + closeImpl(); + } +} diff --git a/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/AbstractNoninterruptibleWritableChannel.java b/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/AbstractNoninterruptibleWritableChannel.java new file mode 100644 index 0000000..cb08e34 --- /dev/null +++ b/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/AbstractNoninterruptibleWritableChannel.java @@ -0,0 +1,93 @@ +package org.apache.maven.surefire.util.internal; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.Flushable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.NonWritableChannelException; +import java.nio.channels.WritableByteChannel; + +/** + * The channel used for writes which cannot be implicitly closed after the operational Thread + * is {@link Thread#isInterrupted() interrupted}. + * + * @since 3.0.0-M5 + */ +abstract class AbstractNoninterruptibleWritableChannel implements WritableByteChannel, Flushable +{ + private final boolean flushable; + private volatile boolean open = true; + + AbstractNoninterruptibleWritableChannel( boolean flushable ) + { + this.flushable = flushable; + } + + protected abstract void writeImpl( ByteBuffer src ) throws IOException; + protected abstract void closeImpl() throws IOException; + + @Override + public final synchronized int write( ByteBuffer src ) throws IOException + { + if ( !isOpen() ) + { + throw new ClosedChannelException(); + } + + if ( !src.hasArray() || src.isReadOnly() ) + { + throw new NonWritableChannelException(); + } + + if ( src.remaining() != src.capacity() ) + { + src.flip(); + } + + int countWrittenBytes = 0; + + if ( src.hasRemaining() ) + { + countWrittenBytes = src.remaining(); + writeImpl( src ); + src.position( src.limit() ); + if ( flushable ) + { + flush(); + } + } + return countWrittenBytes; + } + + @Override + public final boolean isOpen() + { + return open; + } + + @Override + public final void close() throws IOException + { + open = false; + closeImpl(); + } +} diff --git a/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/Channels.java b/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/Channels.java new file mode 100644 index 0000000..65cb9b4 --- /dev/null +++ b/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/Channels.java @@ -0,0 +1,122 @@ +package org.apache.maven.surefire.util.internal; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import javax.annotation.Nonnull; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; + +import static java.util.Objects.requireNonNull; + +/** + * Converts {@link OutputStream}, {@link java.io.PrintStream}, {@link InputStream} + * to the Java {@link java.nio.channels.Channel}. + * <br> + * We do not use the Java's utility class {@link java.nio.channels.Channels} because the utility + * closes the stream as soon as the particular Thread is interrupted. + * If the frameworks (Zookeeper, Netty) interrupts the thread, the communication channels become + * closed and the JVM hangs. Therefore we developed internal utility which is safe for the Surefire. + * + * @since 3.0.0-M5 + */ +public final class Channels +{ + private Channels() + { + throw new IllegalStateException( "no instantiable constructor" ); + } + + public static WritableByteChannel newChannel( @Nonnull OutputStream out ) + { + return newChannel( out, false ); + } + + public static WritableByteChannel newFlushableChannel( @Nonnull OutputStream out ) + { + return newChannel( out, true ); + } + + public static ReadableByteChannel newChannel( @Nonnull final InputStream is ) + { + requireNonNull( is, "the stream should not be null" ); + + if ( is instanceof FileInputStream && FileInputStream.class.equals( is.getClass() ) ) + { + return ( (FileInputStream) is ).getChannel(); + } + + return new AbstractNoninterruptibleReadableChannel() + { + @Override + protected int readImpl( ByteBuffer src ) throws IOException + { + int count = is.read( src.array(), src.arrayOffset() + src.position(), src.remaining() ); + if ( count > 0 ) + { + src.position( count + src.position() ); + } + return count; + } + + @Override + protected void closeImpl() throws IOException + { + is.close(); + } + }; + } + + private static WritableByteChannel newChannel( @Nonnull final OutputStream out, final boolean flushable ) + { + requireNonNull( out, "the stream should not be null" ); + + if ( out instanceof FileOutputStream && FileOutputStream.class.equals( out.getClass() ) ) + { + return ( (FileOutputStream) out ).getChannel(); + } + + return new AbstractNoninterruptibleWritableChannel( flushable ) + { + @Override + protected void writeImpl( ByteBuffer src ) throws IOException + { + out.write( src.array(), src.arrayOffset() + src.position(), src.remaining() ); + } + + @Override + protected void closeImpl() throws IOException + { + out.close(); + } + + @Override + public void flush() throws IOException + { + out.flush(); + } + }; + } +} diff --git a/surefire-api/src/test/java/org/apache/maven/JUnit4SuiteTest.java b/surefire-api/src/test/java/org/apache/maven/JUnit4SuiteTest.java index 66a95a6..650807f 100644 --- a/surefire-api/src/test/java/org/apache/maven/JUnit4SuiteTest.java +++ b/surefire-api/src/test/java/org/apache/maven/JUnit4SuiteTest.java @@ -35,6 +35,8 @@ import org.apache.maven.surefire.util.RunOrderCalculatorTest; import org.apache.maven.surefire.util.RunOrderTest; import org.apache.maven.surefire.util.ScanResultTest; import org.apache.maven.surefire.util.TestsToRunTest; +import org.apache.maven.surefire.util.internal.ChannelsReaderTest; +import org.apache.maven.surefire.util.internal.ChannelsWriterTest; import org.apache.maven.surefire.util.internal.ConcurrencyUtilsTest; import org.apache.maven.surefire.util.internal.ImmutableMapTest; import org.junit.runner.RunWith; @@ -62,7 +64,9 @@ import org.junit.runners.Suite; SpecificTestClassFilterTest.class, FundamentalFilterTest.class, ImmutableMapTest.class, - ReflectionUtilsTest.class + ReflectionUtilsTest.class, + ChannelsReaderTest.class, + ChannelsWriterTest.class } ) @RunWith( Suite.class ) public class JUnit4SuiteTest diff --git a/surefire-api/src/test/java/org/apache/maven/surefire/util/internal/ChannelsReaderTest.java b/surefire-api/src/test/java/org/apache/maven/surefire/util/internal/ChannelsReaderTest.java new file mode 100644 index 0000000..7e3a685 --- /dev/null +++ b/surefire-api/src/test/java/org/apache/maven/surefire/util/internal/ChannelsReaderTest.java @@ -0,0 +1,250 @@ +package org.apache.maven.surefire.util.internal; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.NonReadableChannelException; +import java.nio.channels.ReadableByteChannel; + +import static java.nio.file.Files.write; +import static org.fest.assertions.Assertions.assertThat; + +/** + * The tests for {@link Channels#newChannel(InputStream)}. + */ +public class ChannelsReaderTest +{ + @Rule + public final ExpectedException ee = ExpectedException.none(); + + @Rule + public final TemporaryFolder tmp = TemporaryFolder.builder() + .assureDeletion() + .build(); + + @Test + public void exactBufferSize() throws Exception + { + ByteArrayInputStream is = new ByteArrayInputStream( new byte[] {1, 2, 3} ); + ReadableByteChannel channel = Channels.newChannel( is ); + ByteBuffer bb = ByteBuffer.allocate( 3 ); + + int countWritten = channel.read( bb ); + + assertThat( countWritten ) + .isEqualTo( 3 ); + + assertThat( bb.arrayOffset() ) + .isEqualTo( 0 ); + + assertThat( bb.position() ) + .isEqualTo( 3 ); + + assertThat( bb.remaining() ) + .isEqualTo( 0 ); + + assertThat( bb.limit() ) + .isEqualTo( 3 ); + + assertThat( bb.capacity() ) + .isEqualTo( 3 ); + + bb.flip(); + + assertThat( bb.arrayOffset() ) + .isEqualTo( 0 ); + + assertThat( bb.position() ) + .isEqualTo( 0 ); + + assertThat( bb.remaining() ) + .isEqualTo( 3 ); + + assertThat( bb.limit() ) + .isEqualTo( 3 ); + + assertThat( bb.capacity() ) + .isEqualTo( 3 ); + + assertThat( bb.array() ) + .isEqualTo( new byte[] {1, 2, 3} ); + + assertThat( channel.isOpen() ) + .isTrue(); + + channel.close(); + + assertThat( channel.isOpen() ) + .isFalse(); + } + + @Test + public void biggerBuffer() throws Exception + { + ByteArrayInputStream is = new ByteArrayInputStream( new byte[] {1, 2, 3} ); + ReadableByteChannel channel = Channels.newChannel( is ); + ByteBuffer bb = ByteBuffer.allocate( 4 ); + + int countWritten = channel.read( bb ); + + assertThat( countWritten ) + .isEqualTo( 3 ); + + assertThat( bb.arrayOffset() ) + .isEqualTo( 0 ); + + assertThat( bb.position() ) + .isEqualTo( 3 ); + + assertThat( bb.remaining() ) + .isEqualTo( 1 ); + + assertThat( bb.limit() ) + .isEqualTo( 4 ); + + assertThat( bb.capacity() ) + .isEqualTo( 4 ); + + bb.flip(); + + assertThat( bb.arrayOffset() ) + .isEqualTo( 0 ); + + assertThat( bb.position() ) + .isEqualTo( 0 ); + + assertThat( bb.remaining() ) + .isEqualTo( 3 ); + + assertThat( bb.limit() ) + .isEqualTo( 3 ); + + assertThat( bb.capacity() ) + .isEqualTo( 4 ); + + assertThat( bb.array() ) + .isEqualTo( new byte[] {1, 2, 3, 0} ); + + assertThat( channel.isOpen() ) + .isTrue(); + + channel.close(); + + assertThat( channel.isOpen() ) + .isFalse(); + } + + @Test + public void shouldFailAfterClosed() throws IOException + { + ByteArrayInputStream is = new ByteArrayInputStream( new byte[] {1, 2, 3} ); + ReadableByteChannel channel = Channels.newChannel( is ); + channel.close(); + assertThat( channel.isOpen() ).isFalse(); + ee.expect( ClosedChannelException.class ); + channel.read( ByteBuffer.allocate( 0 ) ); + } + + @Test + public void shouldFailIfNotReadable() throws IOException + { + ByteArrayInputStream is = new ByteArrayInputStream( new byte[] {1, 2, 3} ); + ReadableByteChannel channel = Channels.newChannel( is ); + ee.expect( NonReadableChannelException.class ); + channel.read( ByteBuffer.allocate( 0 ).asReadOnlyBuffer() ); + } + + @Test + public void shouldFailIOnDirectBuffer() throws IOException + { + ByteArrayInputStream is = new ByteArrayInputStream( new byte[] {1, 2, 3} ); + ReadableByteChannel channel = Channels.newChannel( is ); + ee.expect( NonReadableChannelException.class ); + channel.read( ByteBuffer.allocateDirect( 0 ) ); + } + + @Test + public void shouldUseFileChannel() throws IOException + { + File f = tmp.newFile(); + write( f.toPath(), new byte[] {1, 2, 3} ); + FileInputStream is = new FileInputStream( f ); + ReadableByteChannel channel = Channels.newChannel( is ); + ByteBuffer bb = ByteBuffer.allocate( 4 ); + int countWritten = channel.read( bb ); + + assertThat( channel.isOpen() ) + .isTrue(); + + channel.close(); + + assertThat( channel.isOpen() ) + .isFalse(); + + assertThat( countWritten ) + .isEqualTo( 3 ); + + assertThat( bb.arrayOffset() ) + .isEqualTo( 0 ); + + assertThat( bb.position() ) + .isEqualTo( 3 ); + + assertThat( bb.remaining() ) + .isEqualTo( 1 ); + + assertThat( bb.limit() ) + .isEqualTo( 4 ); + + assertThat( bb.capacity() ) + .isEqualTo( 4 ); + + bb.flip(); + + assertThat( bb.arrayOffset() ) + .isEqualTo( 0 ); + + assertThat( bb.position() ) + .isEqualTo( 0 ); + + assertThat( bb.remaining() ) + .isEqualTo( 3 ); + + assertThat( bb.limit() ) + .isEqualTo( 3 ); + + assertThat( bb.capacity() ) + .isEqualTo( 4 ); + + assertThat( bb.array() ) + .isEqualTo( new byte[] {1, 2, 3, 0} ); + } +} diff --git a/surefire-api/src/test/java/org/apache/maven/surefire/util/internal/ChannelsWriterTest.java b/surefire-api/src/test/java/org/apache/maven/surefire/util/internal/ChannelsWriterTest.java new file mode 100644 index 0000000..a1f6a66 --- /dev/null +++ b/surefire-api/src/test/java/org/apache/maven/surefire/util/internal/ChannelsWriterTest.java @@ -0,0 +1,171 @@ +package org.apache.maven.surefire.util.internal; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.NonWritableChannelException; +import java.nio.channels.WritableByteChannel; + +import static java.nio.file.Files.readAllBytes; +import static org.fest.assertions.Assertions.assertThat; + +/** + * The tests for {@link Channels#newChannel(OutputStream)} and {@link Channels#newFlushableChannel(OutputStream)}. + */ +public class ChannelsWriterTest +{ + @Rule + public final ExpectedException ee = ExpectedException.none(); + + @Rule + public final TemporaryFolder tmp = TemporaryFolder.builder() + .assureDeletion() + .build(); + + @Test + public void wrappedBuffer() throws Exception + { + final boolean[] isFlush = {false}; + ByteArrayOutputStream out = new ByteArrayOutputStream() + { + @Override + public void flush() throws IOException + { + isFlush[0] = true; + super.flush(); + } + }; + WritableByteChannel channel = Channels.newFlushableChannel( out ); + ByteBuffer bb = ByteBuffer.wrap( new byte[] {1, 2, 3} ); + int countWritten = channel.write( bb ); + assertThat( countWritten ) + .isEqualTo( 3 ); + + assertThat( out.toByteArray() ) + .hasSize( 3 ) + .isEqualTo( new byte[] {1, 2, 3} ); + + assertThat( isFlush ) + .hasSize( 1 ) + .containsOnly( true ); + + assertThat( bb.position() ) + .isEqualTo( 3 ); + + assertThat( bb.limit() ) + .isEqualTo( 3 ); + + assertThat( bb.capacity() ) + .isEqualTo( 3 ); + + assertThat( channel.isOpen() ) + .isTrue(); + } + + @Test + public void bigBuffer() throws Exception + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + WritableByteChannel channel = Channels.newChannel( out ); + ByteBuffer bb = ByteBuffer.allocate( 4 ); + bb.put( (byte) 1 ); + bb.put( (byte) 2 ); + bb.put( (byte) 3 ); + int countWritten = channel.write( bb ); + assertThat( countWritten ).isEqualTo( 3 ); + assertThat( out.toByteArray() ) + .hasSize( 3 ) + .isEqualTo( new byte[] {1, 2, 3} ); + + assertThat( bb.position() ) + .isEqualTo( 3 ); + + assertThat( bb.limit() ) + .isEqualTo( 3 ); + + assertThat( bb.capacity() ) + .isEqualTo( 4 ); + + assertThat( channel.isOpen() ) + .isTrue(); + } + + @Test + public void shouldFailAfterClosed() throws IOException + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + WritableByteChannel channel = Channels.newChannel( out ); + channel.close(); + assertThat( channel.isOpen() ).isFalse(); + ee.expect( ClosedChannelException.class ); + channel.write( ByteBuffer.allocate( 0 ) ); + } + + @Test + public void shouldFailIfNotReadable() throws IOException + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + WritableByteChannel channel = Channels.newChannel( out ); + ee.expect( NonWritableChannelException.class ); + channel.write( ByteBuffer.allocate( 0 ).asReadOnlyBuffer() ); + } + + @Test + public void shouldFailIOnDirectBuffer() throws IOException + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + WritableByteChannel channel = Channels.newChannel( out ); + ee.expect( NonWritableChannelException.class ); + channel.write( ByteBuffer.allocateDirect( 0 ) ); + } + + @Test + public void shouldUseFileChannel() throws IOException + { + File f = tmp.newFile(); + FileOutputStream os = new FileOutputStream( f ); + WritableByteChannel channel = Channels.newChannel( os ); + ByteBuffer bb = ByteBuffer.wrap( new byte[] {1, 2, 3} ); + channel.write( bb ); + + assertThat( channel.isOpen() ) + .isTrue(); + + channel.close(); + + assertThat( channel.isOpen() ) + .isFalse(); + + assertThat( readAllBytes( f.toPath() ) ) + .hasSize( 3 ) + .isEqualTo( new byte[] {1, 2, 3} ); + } +} diff --git a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/ForkedBooter.java b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/ForkedBooter.java index 34b752e..46d9d74 100644 --- a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/ForkedBooter.java +++ b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/ForkedBooter.java @@ -382,6 +382,9 @@ public final class ForkedBooter private void acknowledgedExit() { + //noinspection ResultOfMethodCallIgnored + Thread.interrupted(); + commandReader.addByeAckListener( new CommandListener() { @Override diff --git a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelEncoder.java b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelEncoder.java index a9bd41a..a85e4ef 100644 --- a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelEncoder.java +++ b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelEncoder.java @@ -289,23 +289,27 @@ public class LegacyMasterProcessChannelEncoder implements MasterProcessChannelEn private void encodeAndPrintEvent( StringBuilder event ) { - byte[] array = event.append( '\n' ).toString().getBytes( STREAM_ENCODING ); - synchronized ( out ) + try { - try - { - out.write( ByteBuffer.wrap( array ) ); - } - catch ( ClosedChannelException e ) - { - DumpErrorSingleton.getSingleton() - .dumpText( "Channel closed while writing the event '" + event + "'." ); - } - catch ( IOException e ) - { - DumpErrorSingleton.getSingleton().dumpException( e ); - trouble = true; - } + //noinspection ResultOfMethodCallIgnored + Thread.interrupted(); + + byte[] array = event.append( '\n' ) + .toString() + .getBytes( STREAM_ENCODING ); + + out.write( ByteBuffer.wrap( array ) ); + } + catch ( ClosedChannelException e ) + { + DumpErrorSingleton.getSingleton() + .dumpException( e, "Channel closed while writing the event '" + event + "'." ); + } + catch ( IOException e ) + { + DumpErrorSingleton.getSingleton() + .dumpException( e ); + trouble = true; } } diff --git a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelProcessorFactory.java b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelProcessorFactory.java index c6e11da..5713b99 100644 --- a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelProcessorFactory.java +++ b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelProcessorFactory.java @@ -26,7 +26,8 @@ import org.apache.maven.surefire.spi.MasterProcessChannelProcessorFactory; import java.io.IOException; import java.net.MalformedURLException; -import static java.nio.channels.Channels.newChannel; +import static org.apache.maven.surefire.util.internal.Channels.newChannel; +import static org.apache.maven.surefire.util.internal.Channels.newFlushableChannel; /** * Producer of encoder and decoder for process pipes. @@ -62,7 +63,7 @@ public class LegacyMasterProcessChannelProcessorFactory @Override public MasterProcessChannelEncoder createEncoder() { - return new LegacyMasterProcessChannelEncoder( newChannel( System.out ) ); + return new LegacyMasterProcessChannelEncoder( newFlushableChannel( System.out ) ); } @Override diff --git a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CommandlineStreams.java b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CommandlineStreams.java index 43ec328..18dae45 100644 --- a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CommandlineStreams.java +++ b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CommandlineStreams.java @@ -27,8 +27,8 @@ import java.nio.channels.Channel; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; -import static java.nio.channels.Channels.newChannel; -import static org.apache.maven.surefire.extensions.util.FlushableWritableByteChannel.newFlushableChannel; +import static org.apache.maven.surefire.util.internal.Channels.newChannel; +import static org.apache.maven.surefire.util.internal.Channels.newFlushableChannel; /** * @@ -44,8 +44,10 @@ public final class CommandlineStreams implements Closeable { InputStream stdOutStream = process.getInputStream(); stdOutChannel = newChannel( stdOutStream ); + InputStream stdErrStream = process.getErrorStream(); stdErrChannel = newChannel( stdErrStream ); + stdInChannel = newFlushableChannel( process.getOutputStream() ); } diff --git a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/FlushableWritableByteChannel.java b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/FlushableWritableByteChannel.java deleted file mode 100644 index e4112f2..0000000 --- a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/FlushableWritableByteChannel.java +++ /dev/null @@ -1,68 +0,0 @@ -package org.apache.maven.surefire.extensions.util; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import javax.annotation.Nonnull; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.WritableByteChannel; - -import static java.nio.channels.Channels.newChannel; - -/** - * - */ -final class FlushableWritableByteChannel implements WritableByteChannel -{ - private final OutputStream os; - private final WritableByteChannel channel; - - private FlushableWritableByteChannel( @Nonnull OutputStream os ) - { - this.os = os; - this.channel = newChannel( os ); - } - - static synchronized WritableByteChannel newFlushableChannel( OutputStream os ) - { - return new FlushableWritableByteChannel( os ); - } - - @Override - public int write( ByteBuffer src ) throws IOException - { - int countWrittenBytes = channel.write( src ); - os.flush(); - return countWrittenBytes; - } - - @Override - public boolean isOpen() - { - return channel.isOpen(); - } - - @Override - public void close() throws IOException - { - channel.close(); - } -}