This is an automated email from the ASF dual-hosted git repository. gemmellr pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/artemis.git
commit ed9d372d23c018cef0036089355f01705e16cb53 Author: Domenico Francesco Bruscino <[email protected]> AuthorDate: Tue Apr 28 11:51:04 2026 +0200 ARTEMIS-6033 Close the connection factory when the CLI transfer command ends --- .../artemis/cli/commands/messages/Transfer.java | 182 ++++++++++----------- 1 file changed, 91 insertions(+), 91 deletions(-) diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Transfer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Transfer.java index ea543d9207..efdcaeecf9 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Transfer.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Transfer.java @@ -342,108 +342,108 @@ public class Transfer extends InputAbstract { } private int doTransfer(ActionContext context) throws Exception { - ConnectionFactoryClosable sourceConnectionFactory = createConnectionFactory("source", sourceProtocol, sourceURL, sourceUser, sourcePassword, sourceClientID); - Connection sourceConnection = sourceConnectionFactory.createConnection(); - - Session sourceSession = sourceConnection.createSession(Session.SESSION_TRANSACTED); - Destination sourceDestination = createDestination("source", sourceSession, sourceQueue, sourceTopic); - MessageConsumer consumer = null; - if (sourceDestination instanceof Queue) { - if (filter != null) { - consumer = sourceSession.createConsumer(sourceDestination, filter); - } else { - consumer = sourceSession.createConsumer(sourceDestination); - } - } else if (sourceDestination instanceof Topic topic) { + try (ConnectionFactoryClosable sourceConnectionFactory = createConnectionFactory("source", sourceProtocol, sourceURL, sourceUser, sourcePassword, sourceClientID); + Connection sourceConnection = sourceConnectionFactory.createConnection(); + Session sourceSession = sourceConnection.createSession(Session.SESSION_TRANSACTED)) { - if (durableConsumer != null) { - if (filter != null) { - consumer = sourceSession.createDurableConsumer(topic, durableConsumer); - } else { - consumer = sourceSession.createDurableConsumer(topic, durableConsumer, filter, noLocal); - } - } else if (sharedDurableSubscription != null) { + Destination sourceDestination = createDestination("source", sourceSession, sourceQueue, sourceTopic); + MessageConsumer consumer = null; + if (sourceDestination instanceof Queue) { if (filter != null) { - consumer = sourceSession.createSharedDurableConsumer(topic, sharedDurableSubscription, filter); + consumer = sourceSession.createConsumer(sourceDestination, filter); } else { - consumer = sourceSession.createSharedDurableConsumer(topic, sharedDurableSubscription); + consumer = sourceSession.createConsumer(sourceDestination); } - } else if (sharedSubscription != null) { - if (filter != null) { - consumer = sourceSession.createSharedConsumer(topic, sharedSubscription, filter); + } else if (sourceDestination instanceof Topic topic) { + + if (durableConsumer != null) { + if (filter != null) { + consumer = sourceSession.createDurableConsumer(topic, durableConsumer); + } else { + consumer = sourceSession.createDurableConsumer(topic, durableConsumer, filter, noLocal); + } + } else if (sharedDurableSubscription != null) { + if (filter != null) { + consumer = sourceSession.createSharedDurableConsumer(topic, sharedDurableSubscription, filter); + } else { + consumer = sourceSession.createSharedDurableConsumer(topic, sharedDurableSubscription); + } + } else if (sharedSubscription != null) { + if (filter != null) { + consumer = sourceSession.createSharedConsumer(topic, sharedSubscription, filter); + } else { + consumer = sourceSession.createSharedConsumer(topic, sharedSubscription); + } } else { - consumer = sourceSession.createSharedConsumer(topic, sharedSubscription); + throw new IllegalArgumentException("you must specify either --durable-consumer, --shared-durable-subscription or --shared-subscription with a JMS topic"); } - } else { - throw new IllegalArgumentException("you must specify either --durable-consumer, --shared-durable-subscription or --shared-subscription with a JMS topic"); - } - } - - ConnectionFactoryClosable targetConnectionFactory = createConnectionFactory("target", targetProtocol, targetURL, targetUser, targetPassword, null); - Connection targetConnection = targetConnectionFactory.createConnection(); - Session targetSession = targetConnection.createSession(Session.SESSION_TRANSACTED); - Destination targetDestination = createDestination("target", targetSession, targetQueue, targetTopic); - MessageProducer producer = targetSession.createProducer(targetDestination); - - if (sourceURL.equals(targetURL) && sourceDestination.equals(targetDestination)) { - context.out.println("You cannot transfer between " + sourceURL + "/" + sourceDestination + " and " + targetURL + "/" + targetDestination + ".\n" + "That would create an infinite recursion."); - throw new IllegalArgumentException("cannot use " + sourceDestination + " == " + targetDestination); - } - - sourceConnection.start(); - int pending = 0, total = 0; - while (total < messageCount) { - - Message receivedMessage; - if (receiveTimeout < 0) { - receivedMessage = consumer.receive(); - } else if (receiveTimeout == 0) { - receivedMessage = consumer.receiveNoWait(); - } else { - receivedMessage = consumer.receive(receiveTimeout); } - if (receivedMessage == null) { - if (isVerbose()) { - context.out.println("could not receive any more messages"); - } - break; - } - producer.send(receivedMessage); - pending++; - total++; - - if (isVerbose()) { - context.out.println("Received message " + total + " with " + pending + " messages pending to be commited"); - } - if (pending > commitInterval) { - context.out.println("Transferred " + pending + " messages of " + total); - pending = 0; - targetSession.commit(); - if (!isCopy()) { - sourceSession.commit(); + try (MessageConsumer sourceConsumer = consumer; + ConnectionFactoryClosable targetConnectionFactory = createConnectionFactory("target", targetProtocol, targetURL, targetUser, targetPassword, null); + Connection targetConnection = targetConnectionFactory.createConnection(); + Session targetSession = targetConnection.createSession(Session.SESSION_TRANSACTED)) { + + Destination targetDestination = createDestination("target", targetSession, targetQueue, targetTopic); + + try (MessageProducer producer = targetSession.createProducer(targetDestination)) { + + if (sourceURL.equals(targetURL) && sourceDestination.equals(targetDestination)) { + context.out.println("You cannot transfer between " + sourceURL + "/" + sourceDestination + " and " + targetURL + "/" + targetDestination + ".\n" + "That would create an infinite recursion."); + throw new IllegalArgumentException("cannot use " + sourceDestination + " == " + targetDestination); + } + + sourceConnection.start(); + int pending = 0, total = 0; + while (total < messageCount) { + + Message receivedMessage; + if (receiveTimeout < 0) { + receivedMessage = sourceConsumer.receive(); + } else if (receiveTimeout == 0) { + receivedMessage = sourceConsumer.receiveNoWait(); + } else { + receivedMessage = sourceConsumer.receive(receiveTimeout); + } + + if (receivedMessage == null) { + if (isVerbose()) { + context.out.println("could not receive any more messages"); + } + break; + } + producer.send(receivedMessage); + pending++; + total++; + + if (isVerbose()) { + context.out.println("Received message " + total + " with " + pending + " messages pending to be commited"); + } + if (pending > commitInterval) { + context.out.println("Transferred " + pending + " messages of " + total); + pending = 0; + targetSession.commit(); + if (!isCopy()) { + sourceSession.commit(); + } + } + } + + context.out.println("Transferred a total of " + total + " messages"); + + if (pending != 0) { + targetSession.commit(); + if (isCopy()) { + sourceSession.rollback(); + } else { + sourceSession.commit(); + } + } + + return total; } } } - - context.out.println("Transferred a total of " + total + " messages"); - - if (pending != 0) { - targetSession.commit(); - if (isCopy()) { - sourceSession.rollback(); - } else { - sourceSession.commit(); - } - } - - sourceConnection.close(); - sourceConnectionFactory.close(); - - targetConnection.close(); - targetConnectionFactory.close(); - - return total; } Destination createDestination(String role, Session session, String queue, String topic) throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
