[ https://issues.apache.org/jira/browse/MNG-8031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853595#comment-17853595 ]
ASF GitHub Bot commented on MNG-8031: ------------------------------------- pshevche commented on code in PR #1471: URL: https://github.com/apache/maven/pull/1471#discussion_r1632930409 ########## maven-embedder/src/main/java/org/apache/maven/cli/transfer/SimplexTransferListener.java: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.maven.cli.transfer; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; + +import org.eclipse.aether.transfer.AbstractTransferListener; +import org.eclipse.aether.transfer.TransferCancelledException; +import org.eclipse.aether.transfer.TransferEvent; +import org.eclipse.aether.transfer.TransferListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + +/** + * A {@link TransferListener} implementation that wraps another delegate {@link TransferListener} but makes it run + * on single thread, keeping the listener logic simple. This listener also blocks on last transfer event to allow + * output to perform possible cleanup. It spawns a daemon thread to consume queued events that may fall in even + * concurrently. + * + * @since 3.9.7 + */ +public final class SimplexTransferListener extends AbstractTransferListener { + private static final Logger LOGGER = LoggerFactory.getLogger(SimplexTransferListener.class); + private static final int QUEUE_SIZE = 1024; + private static final int BATCH_MAX_SIZE = 500; + private final TransferListener delegate; + private final int batchMaxSize; + private final boolean blockOnLastEvent; + private final ArrayBlockingQueue<Exchange> eventQueue; + + /** + * Constructor that makes passed in delegate run on single thread, and will block on last event. + */ + public SimplexTransferListener(TransferListener delegate) { + this(delegate, QUEUE_SIZE, BATCH_MAX_SIZE, true); + } + + /** + * Constructor that may alter behaviour of this listener. + * + * @param delegate The delegate that should run on single thread. + * @param queueSize The event queue size (default {@code 1024}). + * @param batchMaxSize The maximum batch size delegate should receive (default {@code 500}). + * @param blockOnLastEvent Should this listener block on last transfer end (completed or corrupted) block? (default {@code true}). + */ + public SimplexTransferListener( + TransferListener delegate, int queueSize, int batchMaxSize, boolean blockOnLastEvent) { + this.delegate = requireNonNull(delegate); + if (queueSize < 1 || batchMaxSize < 1) { + throw new IllegalArgumentException("Queue and batch sizes must be greater than 1"); + } + this.batchMaxSize = batchMaxSize; + this.blockOnLastEvent = blockOnLastEvent; + + this.eventQueue = new ArrayBlockingQueue<>(queueSize); + Thread updater = new Thread(this::feedConsumer); + updater.setDaemon(true); + updater.start(); + } + + public TransferListener getDelegate() { + return delegate; + } + + private void feedConsumer() { + final ArrayList<Exchange> batch = new ArrayList<>(batchMaxSize); + try { + while (true) { + batch.clear(); + if (eventQueue.drainTo(batch, BATCH_MAX_SIZE) == 0) { + batch.add(eventQueue.take()); + } + demux(batch); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void demux(List<Exchange> exchanges) { + for (Exchange exchange : exchanges) { + exchange.process(transferEvent -> { + TransferEvent.EventType type = transferEvent.getType(); + try { + switch (type) { + case INITIATED: + delegate.transferInitiated(transferEvent); + break; + case STARTED: + delegate.transferStarted(transferEvent); + break; + case PROGRESSED: + delegate.transferProgressed(transferEvent); + break; + case CORRUPTED: + delegate.transferCorrupted(transferEvent); + break; + case SUCCEEDED: + delegate.transferSucceeded(transferEvent); + break; + case FAILED: + delegate.transferFailed(transferEvent); + break; + default: + LOGGER.warn("Invalid TransferEvent.EventType={}; ignoring it", type); + } + } catch (TransferCancelledException e) { + ongoing.put(transferEvent.getResource().getFile(), Boolean.FALSE); + } + }); + } + } + + private void put(TransferEvent event, boolean last) { + try { + Exchange exchange; + if (blockOnLastEvent && last) { + exchange = new BlockingExchange(event); + } else { + exchange = new Exchange(event); + } + eventQueue.put(exchange); + exchange.waitForProcessed(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private final ConcurrentHashMap<File, Boolean> ongoing = new ConcurrentHashMap<>(); + + @Override + public void transferInitiated(TransferEvent event) { + ongoing.putIfAbsent(event.getResource().getFile(), Boolean.TRUE); Review Comment: @cstamas , `TransferResource` supports `file` being `null` ([ref](https://github.com/apache/maven-resolver/blob/master/maven-resolver-api/src/main/java/org/eclipse/aether/transfer/TransferResource.java#L166)). Althought the field is deprecated, the replacement is also nullable ([ref](https://github.com/apache/maven-resolver/blob/master/maven-resolver-api/src/main/java/org/eclipse/aether/transfer/TransferResource.java#L178)). However, this listener implementation will break if the resource has no `file` set. Was it a conscious decision? > Backport: Make Maven transfer listener used with Resolver more concurrent > friendly > ---------------------------------------------------------------------------------- > > Key: MNG-8031 > URL: https://issues.apache.org/jira/browse/MNG-8031 > Project: Maven > Issue Type: Improvement > Components: Core > Reporter: Tamas Cservenak > Assignee: Tamas Cservenak > Priority: Major > Fix For: 3.9.7 > > > Backport of MNG-8007 -- This message was sent by Atlassian Jira (v8.20.10#820010)