cnauroth commented on code in PR #4248:
URL: https://github.com/apache/hadoop/pull/4248#discussion_r955418817
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java:
##########
@@ -528,8 +545,33 @@ public void collect(Object key, Object value) throws
IOException {
* could not be closed properly.
*/
public void close() throws IOException {
+ int nThreads = 10;
+ AtomicReference<IOException> ioException = new AtomicReference<>();
+ ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+ List<Callable<Object>> callableList = new ArrayList<>();
+
for (RecordWriter writer : recordWriters.values()) {
- writer.close(null);
+ callableList.add(() -> {
+ try {
+ writer.close(null);
+ throw new IOException();
+ } catch (IOException e) {
+ ioException.set(e);
+ }
+ return null;
+ });
+ }
+ try {
+ executorService.invokeAll(callableList);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ executorService.shutdown();
Review Comment:
Sorry, I think I gave some bad advice here. I see now that you're using
`invokeAll`, and that method only returns after all invocations complete.
Therefore, we know the work is all done, and we can proceed to `shutdown`.
Calling `awaitTermination` opens up a new problem: how to decide on the
timeout, here arbitrarily chosen as 50 seconds. Since we don't really need
`awaitTermination`, we might as well remove it and avoid that problem.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]