This is an automated email from the ASF dual-hosted git repository. ggregory pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-csv.git
The following commit(s) were added to refs/heads/master by this push: new bf09a8b6 [CSV-318] printRecord() hangs if fed a parallel stream bf09a8b6 is described below commit bf09a8b676e6d3077b62df034209e875b45f112b Author: Gary Gregory <garydgreg...@gmail.com> AuthorDate: Tue May 6 15:26:01 2025 -0400 [CSV-318] printRecord() hangs if fed a parallel stream Add tests --- .../org/apache/commons/csv/JiraCsv318Test.java | 145 +++++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/src/test/java/org/apache/commons/csv/JiraCsv318Test.java b/src/test/java/org/apache/commons/csv/JiraCsv318Test.java new file mode 100644 index 00000000..d16f790f --- /dev/null +++ b/src/test/java/org/apache/commons/csv/JiraCsv318Test.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.commons.csv; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Stream; + +import org.apache.commons.io.function.IOConsumer; +import org.apache.commons.io.function.IOStream; +import org.apache.commons.lang3.ArrayUtils; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +/** + * Tests https://issues.apache.org/jira/projects/CSV/issues/CSV-318?filter=allopenissues + * + * @see CSVPrinter + */ +public class JiraCsv318Test { + + private void checkOutput(final ByteArrayOutputStream baos) { + checkOutput(baos.toString()); + } + + private void checkOutput(final String string) { + assertEquals("col a,col b,col c", string.trim()); + } + + private Stream<String> newParallelStream() { + // returned stream is intermediate + return newStream().parallel(); + } + + private CSVPrinter newPrinter(final ByteArrayOutputStream baos) throws IOException { + return new CSVPrinter(new PrintWriter(baos), CSVFormat.DEFAULT); + } + + private Stream<String> newSequentialStream() { + // returned stream is intermediate + return newStream().sequential(); + } + + private Stream<String> newStream() { + return Stream.of("col a", "col b", "col c"); + } + + public synchronized void printRecord(final Stream<?> values) throws IOException { + // IOStream.adapt(values).forEachOrdered(this::print); + } + + @Test + void testDefaultStream() throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (CSVPrinter printer = newPrinter(baos)) { + printer.printRecord(newStream()); + } + checkOutput(baos); + } + + @SuppressWarnings("resource") + @Test + void testParallelIOStream() throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (CSVPrinter printer = newPrinter(baos)) { + IOStream.adapt(newParallelStream()).forEachOrdered(printer::print); + } + // No EOR marker in this test intentionally, so checkOutput will trim. + checkOutput(baos); + } + + @SuppressWarnings("resource") + @Test + @Disabled("Deadlock because CSVPrinter.print(Object) is synchronized") + void testParallelIOStreamSynchronizedPrinter() throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (CSVPrinter printer = newPrinter(baos)) { + synchronized (printer) { + IOStream.adapt(newParallelStream()).forEachOrdered(printer::print); + } + } + // No EOR marker in this test intentionally, so checkOutput will trim. + checkOutput(baos); + } + + @SuppressWarnings("resource") + @Test + void testParallelIOStreamSynchronizedPrinterNotUsed() throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (CSVPrinter printer = newPrinter(baos)) { + synchronized (printer) { + IOStream.adapt(newParallelStream()).forEachOrdered(IOConsumer.noop()); + } + } + final List<String> list = new ArrayList<>(); + try (CSVPrinter printer = newPrinter(baos)) { + synchronized (printer) { + IOStream.adapt(newParallelStream()).forEachOrdered(list::add); + } + } + // No EOR marker in this test intentionally, so checkOutput will trim. + checkOutput(String.join(",", list.toArray(ArrayUtils.EMPTY_STRING_ARRAY))); + } + + @Test + @Disabled("Deadlock because CSVPrinter.print(Object) is synchronized") + void testParallelStream() throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (CSVPrinter printer = newPrinter(baos)) { + printer.printRecord(newParallelStream()); + } + checkOutput(baos); + } + + @Test + void testSequentialStream() throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (CSVPrinter printer = newPrinter(baos)) { + printer.printRecord(newSequentialStream()); + } + checkOutput(baos); + } +}