This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new cdbda6832e ARTEMIS-5609 Add support for acceptor updates on config
reload
cdbda6832e is described below
commit cdbda6832e67ef4d10ae6f6e6f78cf7d86ac46ad
Author: Timothy Bish <[email protected]>
AuthorDate: Tue Aug 5 10:41:49 2025 -0400
ARTEMIS-5609 Add support for acceptor updates on config reload
Allows acceptors to be added, updated and removed from configuation and
that change is handled via the reload manager. Removed or updated acceptors
have all connections under them closed as a consequence of that action.
---
.../remoting/server/impl/RemotingServiceImpl.java | 240 +++++++++++-
.../core/server/impl/ActiveMQServerImpl.java | 79 +---
.../core/server/management/ManagementService.java | 7 +-
.../management/impl/ManagementServiceImpl.java | 1 +
.../artemis/core/server/reload/ReloadManager.java | 9 +
.../core/server/reload/ReloadManagerImpl.java | 17 +-
.../server/group/impl/ClusteredResetMockTest.java | 12 +-
docs/user-manual/config-reload.adoc | 4 +
.../tests/integration/jms/RedeployTest.java | 429 ++++++++++++++++++++-
.../src/test/resources/reload-acceptor-add-new.xml | 37 ++
.../src/test/resources/reload-acceptor-updated.xml | 36 ++
.../src/test/resources/reload-acceptor.xml | 36 ++
.../test/resources/reload-no-acceptor-config.xml | 33 ++
13 files changed, 851 insertions(+), 89 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 284893f2a0..25907935ae 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -16,15 +16,30 @@
*/
package org.apache.activemq.artemis.core.remoting.server.impl;
+import static
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_SSL_AUTO_RELOAD;
+import static
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.KEYSTORE_PATH_PROP_NAME;
+import static
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.KEYSTORE_TYPE_PROP_NAME;
+import static
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.SSL_AUTO_RELOAD_PROP_NAME;
+import static
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.TRUSTSTORE_PATH_PROP_NAME;
+import static
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.TRUSTSTORE_TYPE_PROP_NAME;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
import java.lang.invoke.MethodHandles;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.security.AccessController;
import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -38,6 +53,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -46,6 +63,8 @@ import
org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.management.AcceptorControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ConfigurationUtils;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
@@ -62,6 +81,7 @@ import
org.apache.activemq.artemis.core.server.ServiceRegistry;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.management.ManagementService;
+import org.apache.activemq.artemis.core.server.reload.ReloadManager;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
@@ -78,6 +98,7 @@ import
org.apache.activemq.artemis.spi.core.remoting.ssl.OpenSSLContextFactoryPr
import
org.apache.activemq.artemis.spi.core.remoting.ssl.SSLContextFactoryProvider;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
+import org.apache.activemq.artemis.utils.PemConfigUtil;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,10 +109,11 @@ public class RemotingServiceImpl implements
RemotingService, ServerConnectionLif
private static final int ACCEPTOR_STOP_TIMEOUT = 3000;
+ private static final int UPDATE_ACCEPTORS_STOP_TIMEOUT = 5000;
private volatile boolean started = false;
- private final Set<TransportConfiguration> acceptorsConfig;
+ private final Map<String, TransportConfiguration> acceptorsConfig;
private final List<BaseInterceptor> incomingInterceptors = new
CopyOnWriteArrayList<>();
@@ -140,7 +162,12 @@ public class RemotingServiceImpl implements
RemotingService, ServerConnectionLif
final ServiceRegistry serviceRegistry) {
this.serviceRegistry = serviceRegistry;
- acceptorsConfig = config.getAcceptorConfigurations();
+ if (config.getAcceptorConfigurations() != null &&
!config.getAcceptorConfigurations().isEmpty()) {
+ acceptorsConfig = config.getAcceptorConfigurations().stream()
+
.collect(Collectors.toMap(c -> c.getName(), Function.identity()));
+ } else {
+ acceptorsConfig = new HashMap<>();
+ }
this.server = server;
@@ -206,7 +233,7 @@ public class RemotingServiceImpl implements
RemotingService, ServerConnectionLif
threadPool = Executors.newCachedThreadPool(tFactory);
- for (TransportConfiguration info : acceptorsConfig) {
+ for (TransportConfiguration info : acceptorsConfig.values()) {
createAcceptor(info);
}
@@ -281,6 +308,21 @@ public class RemotingServiceImpl implements
RemotingService, ServerConnectionLif
ActiveMQServerLogger.LOGGER.errorCreatingAcceptor(info.getName(), e);
}
+ if (server.getConfiguration().getConfigurationFileRefreshPeriod() > 0) {
+ // track TLS resources on acceptors and reload on updates
+ final Map<String, Object> config = info.getCombinedParams();
+
+ if (ConfigurationHelper.getBooleanProperty(SSL_AUTO_RELOAD_PROP_NAME,
DEFAULT_SSL_AUTO_RELOAD, config)) {
+ addAcceptorStoreReloadCallback(info.getName(),
+ fileUrlFrom(config.get(KEYSTORE_PATH_PROP_NAME)),
+ storeTypeFrom(config.get(KEYSTORE_TYPE_PROP_NAME)));
+
+ addAcceptorStoreReloadCallback(info.getName(),
+ fileUrlFrom(config.get(TRUSTSTORE_PATH_PROP_NAME)),
+ storeTypeFrom(config.get(TRUSTSTORE_TYPE_PROP_NAME)));
+ }
+ }
+
return acceptor;
}
@@ -329,7 +371,6 @@ public class RemotingServiceImpl implements
RemotingService, ServerConnectionLif
public synchronized void pauseAcceptors() {
if (!started)
return;
-
paused = true;
for (Acceptor acceptor : acceptors.values()) {
@@ -571,11 +612,124 @@ public class RemotingServiceImpl implements
RemotingService, ServerConnectionLif
@Override
public void updateProtocolServices(List<ActiveMQComponent>
protocolServices) throws Exception {
+ updateAcceptors();
+
for (ProtocolManagerFactory protocolManagerFactory :
protocolMap.values()) {
protocolManagerFactory.updateProtocolServices(this.server,
protocolServices);
}
}
+ private void updateAcceptors() throws Exception {
+ final Set<TransportConfiguration> updatedConfigurationSet =
Objects.requireNonNullElse(server.getConfiguration().getAcceptorConfigurations(),
Collections.emptySet());
+ final Map<String, TransportConfiguration> updatedConfiguration =
+ updatedConfigurationSet.stream()
+ .collect(Collectors.toMap(c -> c.getName(),
Function.identity()));
+
+ final Set<TransportConfiguration> acceptorsToStop = new HashSet<>();
+ final Set<TransportConfiguration> acceptorsToCreate = new HashSet<>();
+
+ for (TransportConfiguration candidateConfiguration :
updatedConfiguration.values()) {
+ final TransportConfiguration previous =
acceptorsConfig.get(candidateConfiguration.getName());
+
+ if (previous == null) {
+ // New configuration that was added during the update
+ acceptorsToCreate.add(candidateConfiguration);
+ } else if (!previous.equals(candidateConfiguration)) {
+ // Updated configuration that needs to be stopped and restarted.
+ acceptorsToCreate.add(candidateConfiguration);
+ acceptorsToStop.add(candidateConfiguration);
+ }
+ }
+
+ for (TransportConfiguration currentConfiguration :
acceptorsConfig.values()) {
+ if
(!updatedConfiguration.containsKey(currentConfiguration.getName())) {
+ // Acceptor that was removed from the configuration which needs
stopped and removed.
+ acceptorsToStop.add(currentConfiguration);
+ }
+ }
+
+ // Replace old configuration map with new configurations ahead of the
stop and restart phase.
+ acceptorsConfig.clear();
+ acceptorsConfig.putAll(updatedConfiguration);
+
+ final CountDownLatch acceptorsStoppedLatch = new
CountDownLatch(acceptorsToStop.size());
+
+ for (TransportConfiguration acceptorToStop : acceptorsToStop) {
+ final Acceptor acceptor = acceptors.remove(acceptorToStop.getName());
+
+ if (acceptor == null) {
+ continue;
+ }
+
+ final Map<String, Object> acceptorToStopParams =
acceptorToStop.getCombinedParams();
+
+ removeAcceptorStoreReloadCallback(acceptorToStop.getName(),
+ fileUrlFrom(acceptorToStopParams.get(KEYSTORE_PATH_PROP_NAME)),
+ storeTypeFrom(acceptorToStopParams.get(KEYSTORE_TYPE_PROP_NAME)));
+
+ removeAcceptorStoreReloadCallback(acceptorToStop.getName(),
+ fileUrlFrom(acceptorToStopParams.get(TRUSTSTORE_PATH_PROP_NAME)),
+
storeTypeFrom(acceptorToStopParams.get(TRUSTSTORE_TYPE_PROP_NAME)));
+
+ try {
+ managementService.unregisterAcceptor(acceptor.getName());
+ } catch (Throwable t) {
+
ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptor.getName());
+ }
+
+ try {
+ acceptor.notifyStop();
+ } catch (Throwable t) {
+
ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptor.getName());
+ }
+
+ try {
+ acceptor.pause();
+ } catch (Throwable t) {
+
ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptor.getName());
+ }
+
+ try {
+ acceptor.asyncStop(acceptorsStoppedLatch::countDown);
+ } catch (Throwable t) {
+
ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptor.getName());
+ }
+ }
+
+ // In some cases an acceptor stopping could be locked ie NettyAcceptor
stopping could be locked by a network failure.
+ if (!acceptorsStoppedLatch.await(UPDATE_ACCEPTORS_STOP_TIMEOUT,
TimeUnit.MILLISECONDS)) {
+ logger.warn("Timed out waiting on removed or updated acceptors
stopping.");
+ }
+
+ final Collection<Acceptor> acceptorsToStart = new ArrayList<>();
+
+ // Add all the new or updated acceptors now that removed or updated
acceptors have been stopped.
+ for (TransportConfiguration candidateConfiguration : acceptorsToCreate) {
+ final Acceptor acceptor = createAcceptor(candidateConfiguration);
+
+ if (isStarted() && acceptor instanceof NettyAcceptor startable &&
startable.isAutoStart()) {
+ acceptorsToStart.add(startable);
+ }
+ }
+
+ Exception acceptorStartError = null;
+
+ for (Acceptor acceptor : acceptorsToStart) {
+ try {
+ acceptor.start();
+ } catch (Exception e) {
+
ActiveMQServerLogger.LOGGER.errorStartingAcceptor(acceptor.getName(),
acceptor.getConfiguration());
+ if (acceptorStartError == null) {
+ acceptorStartError = e;
+ }
+ }
+ }
+
+ if (acceptorStartError != null) {
+ throw acceptorStartError;
+ }
+ }
+
// ServerConnectionLifeCycleListener implementation
-----------------------------------
private ProtocolManagerFactory getProtocolManager(String protocol) {
@@ -894,4 +1048,82 @@ public class RemotingServiceImpl implements
RemotingService, ServerConnectionLif
}
}
+ private void removeAcceptorStoreReloadCallback(String acceptorName, URL
storeURL, String storeType) {
+ if (storeURL != null) {
+ final ReloadManager reloadManager = server.getReloadManager();
+
+ reloadManager.removeCallbacks(storeURL);
+
+ if (PemConfigUtil.isPemConfigStoreType(storeType)) {
+ String[] sources = null;
+
+ try (InputStream pemConfigStream = storeURL.openStream()) {
+ sources = PemConfigUtil.parseSources(pemConfigStream);
+ } catch (IOException e) {
+
ActiveMQServerLogger.LOGGER.skipSSLAutoReloadForSourcesOfStore(storeURL.getPath(),
e.toString());
+ }
+
+ if (sources != null) {
+ for (String source : sources) {
+ URL sourceURL = fileUrlFrom(source);
+ if (sourceURL != null) {
+ reloadManager.removeCallbacks(sourceURL);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void addAcceptorStoreReloadCallback(String acceptorName, URL
storeURL, String storeType) {
+ if (storeURL != null) {
+ server.getReloadManager().addCallback(storeURL, (uri) -> {
+ // preference for Control to capture consistent audit logging
+ if (managementService != null) {
+ Object targetControl =
managementService.getResource(ResourceNames.ACCEPTOR + acceptorName);
+ if (targetControl instanceof AcceptorControl acceptorControl) {
+ acceptorControl.reload();
+ }
+ }
+ });
+
+ if (PemConfigUtil.isPemConfigStoreType(storeType)) {
+ String[] sources = null;
+
+ try (InputStream pemConfigStream = storeURL.openStream()) {
+ sources = PemConfigUtil.parseSources(pemConfigStream);
+ } catch (IOException e) {
+
ActiveMQServerLogger.LOGGER.skipSSLAutoReloadForSourcesOfStore(storeURL.getPath(),
e.toString());
+ }
+
+ if (sources != null) {
+ for (String source : sources) {
+ URL sourceURL = fileUrlFrom(source);
+ if (sourceURL != null) {
+ addAcceptorStoreReloadCallback(acceptorName, sourceURL,
null);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private static URL fileUrlFrom(Object o) {
+ if (o instanceof String string) {
+ try {
+ return new File(string).toURI().toURL();
+ } catch (MalformedURLException ignored) {
+ }
+ }
+
+ return null;
+ }
+
+ private static String storeTypeFrom(Object o) {
+ if (o instanceof String string) {
+ return string;
+ }
+
+ return null;
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index fbfe0fe976..4d06e8879c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -16,15 +16,12 @@
*/
package org.apache.activemq.artemis.core.server.impl;
-import javax.management.MBeanServer;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.lang.management.ManagementFactory;
-import java.net.MalformedURLException;
import java.net.URL;
import java.security.AccessController;
import java.security.PrivilegedAction;
@@ -59,6 +56,8 @@ import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import javax.management.MBeanServer;
+
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -67,7 +66,6 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.management.AcceptorControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@@ -208,10 +206,8 @@ import
org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoader;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
import org.apache.activemq.artemis.utils.CompositeAddress;
-import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.OpenWireUUIDUtil;
-import org.apache.activemq.artemis.utils.PemConfigUtil;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.SecurityFormatter;
import org.apache.activemq.artemis.utils.ThreadDumpUtil;
@@ -230,12 +226,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.stream.Collectors.groupingBy;
-import static
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_SSL_AUTO_RELOAD;
-import static
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.KEYSTORE_PATH_PROP_NAME;
-import static
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.KEYSTORE_TYPE_PROP_NAME;
-import static
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.SSL_AUTO_RELOAD_PROP_NAME;
-import static
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.TRUSTSTORE_PATH_PROP_NAME;
-import static
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.TRUSTSTORE_TYPE_PROP_NAME;
import static
org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
/**
@@ -3410,20 +3400,6 @@ public class ActiveMQServerImpl implements
ActiveMQServer {
PropertiesLoader.reload();
});
}
-
- // track tls resources on acceptors and reload via remoting server
- configuration.getAcceptorConfigurations().forEach((acceptorConfig) ->
{
- Map<String, Object> config = acceptorConfig.getCombinedParams();
- if
(ConfigurationHelper.getBooleanProperty(SSL_AUTO_RELOAD_PROP_NAME,
DEFAULT_SSL_AUTO_RELOAD, config)) {
- addAcceptorStoreReloadCallback(acceptorConfig.getName(),
- fileUrlFrom(config.get(KEYSTORE_PATH_PROP_NAME)),
- storeTypeFrom(config.get(KEYSTORE_TYPE_PROP_NAME)));
-
- addAcceptorStoreReloadCallback(acceptorConfig.getName(),
- fileUrlFrom(config.get(TRUSTSTORE_PATH_PROP_NAME)),
- storeTypeFrom(config.get(TRUSTSTORE_TYPE_PROP_NAME)));
- }
- });
}
if (hasBrokerPlugins()) {
@@ -3437,56 +3413,6 @@ public class ActiveMQServerImpl implements
ActiveMQServer {
return true;
}
- private void addAcceptorStoreReloadCallback(String acceptorName, URL
storeURL, String storeType) {
- if (storeURL != null) {
- reloadManager.addCallback(storeURL, (uri) -> {
- // preference for Control to capture consistent audit logging
- if (managementService != null) {
- Object targetControl =
managementService.getResource(ResourceNames.ACCEPTOR + acceptorName);
- if (targetControl instanceof AcceptorControl acceptorControl) {
- acceptorControl.reload();
- }
- }
- });
-
- if (PemConfigUtil.isPemConfigStoreType(storeType)) {
- String[] sources = null;
-
- try (InputStream pemConfigStream = storeURL.openStream()) {
- sources = PemConfigUtil.parseSources(pemConfigStream);
- } catch (IOException e) {
-
ActiveMQServerLogger.LOGGER.skipSSLAutoReloadForSourcesOfStore(storeURL.getPath(),
e.toString());
- }
-
- if (sources != null) {
- for (String source : sources) {
- URL sourceURL = fileUrlFrom(source);
- if (sourceURL != null) {
- addAcceptorStoreReloadCallback(acceptorName, sourceURL,
null);
- }
- }
- }
- }
- }
- }
-
- private URL fileUrlFrom(Object o) {
- if (o instanceof String string) {
- try {
- return new File(string).toURI().toURL();
- } catch (MalformedURLException ignored) {
- }
- }
- return null;
- }
-
- private String storeTypeFrom(Object o) {
- if (o instanceof String string) {
- return string;
- }
- return null;
- }
-
@Override
public void installMirrorController(MirrorController mirrorController) {
logger.debug("Mirror controller is being installed");
@@ -4697,6 +4623,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
{
configuration.setQueueConfigs(config.getQueueConfigs());
configuration.setBridgeConfigurations(config.getBridgeConfigurations());
configuration.setConnectorConfigurations(config.getConnectorConfigurations());
+
configuration.setAcceptorConfigurations(config.getAcceptorConfigurations());
configuration.setAMQPConnectionConfigurations(config.getAMQPConnection());
configurationReloadDeployed.set(false);
if (isActive()) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
index 2f2151a859..8ad304fe80 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
@@ -16,10 +16,11 @@
*/
package org.apache.activemq.artemis.core.server.management;
-import javax.management.ObjectName;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
+import javax.management.ObjectName;
+
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
@@ -46,11 +47,11 @@ import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RemoteBrokerConnection;
-import org.apache.activemq.artemis.core.server.routing.ConnectionRouter;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.routing.ConnectionRouter;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
@@ -107,6 +108,8 @@ public interface ManagementService extends
NotificationService, ActiveMQComponen
void registerAcceptor(Acceptor acceptor, TransportConfiguration
configuration) throws Exception;
+ void unregisterAcceptor(String acceptorName) throws Exception;
+
void unregisterAcceptors();
void registerDivert(Divert divert) throws Exception;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 8eece25ac2..561b9818d5 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -399,6 +399,7 @@ public class ManagementServiceImpl implements
ManagementService {
}
}
+ @Override
public void unregisterAcceptor(final String name) throws Exception {
unregisterFromJMX(objectNameBuilder.getAcceptorObjectName(name));
unregisterFromRegistry(ResourceNames.ACCEPTOR + name);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManager.java
index 480c458509..d4c3ffdc9e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server.reload;
+import java.net.URI;
import java.net.URL;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@@ -24,6 +25,14 @@ public interface ReloadManager extends ActiveMQComponent {
void addCallback(URL uri, ReloadCallback callback);
+ /**
+ * Remove any callback instances previously registered with this manager.
+ *
+ * @param uri
+ * The {@link URI} used to add callback instances to this manager
+ */
+ void removeCallbacks(URL uri);
+
/**
* Callback for the next tick
*/
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
index 3887fb40e6..19189b9909 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java
@@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.server.reload;
import java.io.File;
import java.net.URL;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -38,7 +38,7 @@ public class ReloadManagerImpl extends
ActiveMQScheduledComponent implements Rel
private volatile Runnable tick;
- private final Map<URL, ReloadRegistry> registry = new HashMap<>();
+ private final Map<URL, ReloadRegistry> registry = new ConcurrentHashMap<>();
public ReloadManagerImpl(ScheduledExecutorService scheduledExecutorService,
Executor executor, long checkPeriod) {
super(scheduledExecutorService, executor, checkPeriod,
TimeUnit.MILLISECONDS, false);
@@ -54,6 +54,15 @@ public class ReloadManagerImpl extends
ActiveMQScheduledComponent implements Rel
this.tick = tick;
}
+ @Override
+ public synchronized void removeCallbacks(URL uri) {
+ final ReloadRegistry uriRegistry = registry.remove(uri);
+
+ if (uriRegistry != null) {
+ uriRegistry.clear();
+ }
+ }
+
@Override
public synchronized void addCallback(URL uri, ReloadCallback callback) {
if (!isStarted()) {
@@ -141,5 +150,9 @@ public class ReloadManagerImpl extends
ActiveMQScheduledComponent implements Rel
public void add(ReloadCallback callback) {
callbacks.add(callback);
}
+
+ public void clear() {
+ callbacks.clear();
+ }
}
}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
index e9fd36c868..42a5e17588 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
@@ -18,12 +18,13 @@ package org.apache.activemq.artemis.core.server.group.impl;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import javax.management.ObjectName;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import javax.management.ObjectName;
+
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
@@ -50,15 +51,15 @@ import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RemoteBrokerConnection;
-import
org.apache.activemq.artemis.core.server.management.GuardInvocationHandler;
-import org.apache.activemq.artemis.core.server.routing.ConnectionRouter;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import
org.apache.activemq.artemis.core.server.management.GuardInvocationHandler;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
+import org.apache.activemq.artemis.core.server.routing.ConnectionRouter;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
@@ -281,6 +282,11 @@ public class ClusteredResetMockTest extends ServerTestBase
{
}
+ @Override
+ public void unregisterAcceptor(String name) {
+
+ }
+
@Override
public void unregisterAcceptors() {
diff --git a/docs/user-manual/config-reload.adoc
b/docs/user-manual/config-reload.adoc
index c70aa9ea82..67e6865553 100644
--- a/docs/user-manual/config-reload.adoc
+++ b/docs/user-manual/config-reload.adoc
@@ -16,6 +16,7 @@ Once the configuration file is changed (broker.xml) the
following modules will b
* Diverts
* Addresses & Queues
* Bridges
+* Acceptors
If using xref:configuration-index.adoc#modularising-broker-xml[modularised
`broker.xml`] ensure you also read
xref:configuration-index.adoc#reloading-modular-configuration-files[reloading
modular configuration files]
@@ -550,6 +551,9 @@ Below lists the effects of adding, deleting and updating of
an element/attribute
| The queue user will be set to the new value after reloading
|===
+==== `<acceptors>`
+Adding, updating and removing an `<acceptor>` is supported, updating or
removing an `<acceptor>` results in the closure of all connections that were
accepted previously. Added or updated acceptors are automatically started
during the configuration reload process unless the `auto-start` option is set
to false.
+
=== `<jms>` _(Deprecated)_
=== `<queues>` _(Deprecated)_
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
index d263ea5816..f0b7661e7d 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
@@ -18,20 +18,25 @@ package org.apache.activemq.artemis.tests.integration.jms;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import java.io.File;
import java.io.InputStream;
+import java.io.Writer;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -46,11 +51,20 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.AcceptorControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
@@ -62,14 +76,15 @@ import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing
import
org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
-import org.apache.activemq.artemis.tests.util.Wait;
import
org.apache.activemq.artemis.tests.unit.core.postoffice.impl.fakes.FakeQueue;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.junit.jupiter.api.Test;
@@ -1454,6 +1469,416 @@ public class RedeployTest extends ActiveMQTestBase {
}
}
+ @Test
+ public void testRedeployNewAcceptorsAndRemoveOldAcceptor() throws Exception
{
+ Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
+ URL url1 =
RedeployTest.class.getClassLoader().getResource("reload-acceptor.xml");
+ URL url2 =
RedeployTest.class.getClassLoader().getResource("reload-acceptor-add-new.xml");
+ Files.copy(url1.openStream(), brokerXML);
+
+ final MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
+ runAfter(() -> MBeanServerFactory.releaseMBeanServer(mBeanServer));
+
+ EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
+ embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
+ embeddedActiveMQ.setMbeanServer(mBeanServer);
+ embeddedActiveMQ.start();
+
+ final ReusableLatch latch = new ReusableLatch(1);
+ final Runnable tick = latch::countDown;
+ final ManagementService managementService =
embeddedActiveMQ.getActiveMQServer().getManagementService();
+
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+
+ try {
+ latch.await(10, TimeUnit.SECONDS);
+
+ final TransportConfiguration acceptor0 =
findInConfiguration("artemis",
embeddedActiveMQ.getActiveMQServer().getConfiguration());
+
+ assertNotNull(acceptor0);
+ assertEquals("127.0.0.1",
acceptor0.getParams().get(TransportConstants.HOST_PROP_NAME));
+ assertEquals("61616",
acceptor0.getParams().get(TransportConstants.PORT_PROP_NAME));
+
+ final AcceptorControl acceptorControl = (AcceptorControl)
managementService.getResource(ResourceNames.ACCEPTOR + "artemis");
+
+ assertNotNull(acceptorControl);
+ assertTrue(acceptorControl.isStarted());
+
+ @SuppressWarnings("resource")
+ final ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
+ final Connection connection = connectionFactory.createConnection();
+
+ try (Session session = connection.createSession()) {
+ final Queue queue = session.createQueue("test-queue");
+
+ session.createConsumer(queue);
+ } catch (Exception e) {
+ fail("Should be able to connect and create resources");
+ }
+
+ Files.copy(url2.openStream(), brokerXML,
StandardCopyOption.REPLACE_EXISTING);
+ brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
+ latch.setCount(1);
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+ latch.await(10, TimeUnit.SECONDS);
+
+ // Should not be present in the server any longer
+ final AcceptorControl afterReload = (AcceptorControl)
managementService.getResource(ResourceNames.ACCEPTOR + "artemis");
+ assertNull(afterReload);
+
+ final TransportConfiguration acceptor1 =
findInConfiguration("artemis1",
embeddedActiveMQ.getActiveMQServer().getConfiguration());
+ final TransportConfiguration acceptor2 =
findInConfiguration("artemis2",
embeddedActiveMQ.getActiveMQServer().getConfiguration());
+
+ assertNotNull(acceptor1);
+ assertEquals("127.0.0.2",
acceptor1.getParams().get(TransportConstants.HOST_PROP_NAME));
+ assertEquals("61617",
acceptor1.getParams().get(TransportConstants.PORT_PROP_NAME));
+
+ final AcceptorControl acceptorControl1 = (AcceptorControl)
managementService.getResource(ResourceNames.ACCEPTOR + "artemis1");
+
+ assertNotNull(acceptorControl1);
+ assertTrue(acceptorControl1.isStarted());
+
+ assertNotNull(acceptor2);
+ assertEquals("127.0.0.3",
acceptor2.getParams().get(TransportConstants.HOST_PROP_NAME));
+ assertEquals("61618",
acceptor2.getParams().get(TransportConstants.PORT_PROP_NAME));
+
+ final AcceptorControl acceptorControl2 = (AcceptorControl)
managementService.getResource(ResourceNames.ACCEPTOR + "artemis2");
+
+ assertNotNull(acceptorControl2);
+ assertFalse(acceptorControl2.isStarted());
+
+ // Our original connection on the removed acceptor should have been
closed.
+ try (Session session = connection.createSession()) {
+ final Queue queue = session.createQueue("test-queue");
+
+ session.createConsumer(queue);
+ fail("Should have been disconnected when acceptor was removed.");
+ } catch (Exception e) {
+ }
+ } finally {
+ embeddedActiveMQ.stop();
+ }
+ }
+
+ @Test
+ public void testRedeploySameAcceptorChangedToNotAutoStart() throws
Exception {
+ Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
+ URL url1 =
RedeployTest.class.getClassLoader().getResource("reload-acceptor.xml");
+ URL url2 =
RedeployTest.class.getClassLoader().getResource("reload-acceptor-updated.xml");
+ Files.copy(url1.openStream(), brokerXML);
+
+ final MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
+ runAfter(() -> MBeanServerFactory.releaseMBeanServer(mBeanServer));
+
+ EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
+ embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
+ embeddedActiveMQ.setMbeanServer(mBeanServer);
+ embeddedActiveMQ.start();
+
+ final ReusableLatch latch = new ReusableLatch(1);
+ final Runnable tick = latch::countDown;
+ final ManagementService managementService =
embeddedActiveMQ.getActiveMQServer().getManagementService();
+
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+
+ try {
+ latch.await(10, TimeUnit.SECONDS);
+
+ final TransportConfiguration acceptor =
findInConfiguration("artemis",
embeddedActiveMQ.getActiveMQServer().getConfiguration());
+
+ assertNotNull(acceptor);
+ assertEquals("127.0.0.1",
acceptor.getParams().get(TransportConstants.HOST_PROP_NAME));
+ assertEquals("61616",
acceptor.getParams().get(TransportConstants.PORT_PROP_NAME));
+ assertNull(acceptor.getParams().get(TransportConstants.AUTO_START));
+
+ final AcceptorControl acceptorControl = (AcceptorControl)
managementService.getResource(ResourceNames.ACCEPTOR + "artemis");
+
+ assertNotNull(acceptorControl);
+ assertTrue(acceptorControl.isStarted());
+
+ @SuppressWarnings("resource")
+ final ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
+ final Connection connection = connectionFactory.createConnection();
+
+ try (Session session = connection.createSession()) {
+ final Queue queue = session.createQueue("test-queue");
+
+ session.createConsumer(queue);
+ } catch (Exception e) {
+ fail("Should be able to connect and create resources");
+ }
+
+ Files.copy(url2.openStream(), brokerXML,
StandardCopyOption.REPLACE_EXISTING);
+ brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
+ latch.setCount(1);
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+ latch.await(10, TimeUnit.SECONDS);
+
+ final TransportConfiguration acceptorUpdated =
findInConfiguration("artemis",
embeddedActiveMQ.getActiveMQServer().getConfiguration());
+
+ assertNotNull(acceptorUpdated);
+ assertEquals("127.0.0.1",
acceptorUpdated.getParams().get(TransportConstants.HOST_PROP_NAME));
+ assertEquals("61616",
acceptorUpdated.getParams().get(TransportConstants.PORT_PROP_NAME));
+ assertEquals("false",
acceptorUpdated.getParams().get(TransportConstants.AUTO_START));
+
+ final AcceptorControl acceptorControl2 = (AcceptorControl)
managementService.getResource(ResourceNames.ACCEPTOR + "artemis");
+
+ assertNotNull(acceptorControl2);
+
+ // Re-added and not started according to new configuration
+ assertFalse(acceptorControl2.isStarted());
+
+ // Our original connection on the removed acceptor should have been
closed.
+ try (Session session = connection.createSession()) {
+ final Queue queue = session.createQueue("test-queue");
+
+ session.createConsumer(queue);
+ fail("Should have been disconnected when acceptor was removed.");
+ } catch (Exception e) {
+ }
+
+ // New connection should fail the acceptor is not auto starting now
+ try (ActiveMQConnectionFactory connectionFactory1 = new
ActiveMQConnectionFactory("tcp://127.0.0.1:61616")) {
+ connectionFactory1.createConnection();
+ fail("Should not be able to connect");
+ } catch (Exception e) {
+ }
+ } finally {
+ embeddedActiveMQ.stop();
+ }
+ }
+
+ @Test
+ public void
testUpdatedAcceptorStillWatchesForTLSPropertiesUpdatesAfterReload() throws
Exception {
+ Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
+ Path brokerProperties =
getTestDirfile().toPath().resolve("broker.properties");
+ URL url1 =
RedeployTest.class.getClassLoader().getResource("reload-no-acceptor-config.xml");
+ Files.copy(url1.openStream(), brokerXML);
+
+ // reference Key Store from temporary location that we can update later
+ File keyStoreToReload = new File(getTestDirfile(), "server-ks.p12");
+ copyRecursive(new
File(this.getClass().getClassLoader().getResource("unknown-server-keystore.p12").getFile()),
keyStoreToReload);
+
+ final Properties properties = new
ConfigurationImpl.InsertionOrderedProperties();
+
+ properties.put("acceptorConfigurations.tcp.factoryClassName",
NETTY_ACCEPTOR_FACTORY);
+ properties.put("acceptorConfigurations.tcp.params.host", "127.0.0.1");
+ properties.put("acceptorConfigurations.tcp.params.port", "61617");
+ properties.put("acceptorConfigurations.tcp.params." +
TransportConstants.SSL_AUTO_RELOAD_PROP_NAME, "true");
+ properties.put("acceptorConfigurations.tcp.params." +
TransportConstants.SSL_ENABLED_PROP_NAME, "true");
+ properties.put("acceptorConfigurations.tcp.params." +
TransportConstants.KEYSTORE_PATH_PROP_NAME, keyStoreToReload.getAbsolutePath());
+ properties.put("acceptorConfigurations.tcp.params." +
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "securepass");
+
+ Writer propertiesWriter = Files.newBufferedWriter(brokerProperties,
StandardOpenOption.WRITE,
+
StandardOpenOption.CREATE,
+
StandardOpenOption.TRUNCATE_EXISTING);
+ try {
+ properties.store(propertiesWriter, null);
+ } finally {
+ propertiesWriter.flush();
+ propertiesWriter.close();
+ }
+
+ final MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
+ runAfter(() -> MBeanServerFactory.releaseMBeanServer(mBeanServer));
+
+ EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
+ embeddedActiveMQ.setPropertiesResourcePath(brokerProperties.toString());
+ embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
+ embeddedActiveMQ.setMbeanServer(mBeanServer);
+ embeddedActiveMQ.start();
+
+ final ReusableLatch latch = new ReusableLatch(1);
+ final Runnable tick = latch::countDown;
+
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+
+ try {
+ latch.await(10, TimeUnit.SECONDS);
+
+ final TransportConfiguration acceptor = findInConfiguration("tcp",
embeddedActiveMQ.getActiveMQServer().getConfiguration());
+
+ assertNotNull(acceptor);
+ assertEquals("127.0.0.1",
acceptor.getParams().get(TransportConstants.HOST_PROP_NAME));
+ assertEquals("61617",
acceptor.getParams().get(TransportConstants.PORT_PROP_NAME));
+
+ String url =
"tcp://127.0.0.1:61616?sslEnabled=true;trustStorePath=server-ca-truststore.p12;trustStorePassword=securepass";
+ ServerLocator locator =
addServerLocator(ActiveMQClient.createServerLocator(url)).setCallTimeout(3000);
+
+ try {
+ createSessionFactory(locator);
+ fail("Creating session here should fail due to SSL handshake
problems.");
+ } catch (Exception ignored) {
+ }
+
+ // Update the broker properties file with the correct port to trigger
configuration reload.
+ // This should retain the ability to watch the TLS resources for
updates and after the reload
+ // the test will update the key store to one that allows the client
to connect and we should
+ // see the client succeed eventually.
+ properties.put("acceptorConfigurations.tcp.params.port", "61616");
+
+ propertiesWriter = Files.newBufferedWriter(brokerProperties,
StandardOpenOption.WRITE,
+
StandardOpenOption.TRUNCATE_EXISTING);
+ try {
+ properties.store(propertiesWriter, null);
+ } finally {
+ propertiesWriter.flush();
+ propertiesWriter.close();
+ }
+
+ latch.setCount(1);
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+ latch.await(10, TimeUnit.SECONDS);
+
+ final TransportConfiguration updatedAcceptor =
findInConfiguration("tcp",
embeddedActiveMQ.getActiveMQServer().getConfiguration());
+
+ assertNotNull(updatedAcceptor);
+ assertEquals("127.0.0.1",
updatedAcceptor.getParams().get(TransportConstants.HOST_PROP_NAME));
+ assertEquals("61616",
updatedAcceptor.getParams().get(TransportConstants.PORT_PROP_NAME));
+
+ // update the server side key store with one that actually works with
the client trust store
+ copyRecursive(new
File(this.getClass().getClassLoader().getResource("server-keystore.p12").getFile()),
keyStoreToReload);
+
+ // expect success after auto reload, which we wait for
+ Wait.assertTrue(() -> {
+ try {
+ addSessionFactory(createSessionFactory(locator));
+ return true;
+ } catch (Throwable ignored) {
+ }
+ return false;
+ }, 5000, 100);
+ } finally {
+ embeddedActiveMQ.stop();
+ }
+ }
+
+ @Test
+ public void testReplacedAcceptorWatchesForTLSPropertiesUpdatesAfterReload()
throws Exception {
+ Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
+ Path brokerProperties =
getTestDirfile().toPath().resolve("broker.properties");
+ URL url1 =
RedeployTest.class.getClassLoader().getResource("reload-no-acceptor-config.xml");
+ Files.copy(url1.openStream(), brokerXML);
+
+ // reference Key Store from temporary location that we can update later
+ File keyStoreToReload = new File(getTestDirfile(), "server-ks.p12");
+ copyRecursive(new
File(this.getClass().getClassLoader().getResource("unknown-server-keystore.p12").getFile()),
keyStoreToReload);
+
+ final Properties properties = new
ConfigurationImpl.InsertionOrderedProperties();
+
+ properties.put("acceptorConfigurations.tcp1.factoryClassName",
NETTY_ACCEPTOR_FACTORY);
+ properties.put("acceptorConfigurations.tcp1.params.host", "127.0.0.1");
+ properties.put("acceptorConfigurations.tcp1.params.port", "61617");
+ properties.put("acceptorConfigurations.tcp1.params." +
TransportConstants.SSL_AUTO_RELOAD_PROP_NAME, "true");
+ properties.put("acceptorConfigurations.tcp1.params." +
TransportConstants.SSL_ENABLED_PROP_NAME, "true");
+ properties.put("acceptorConfigurations.tcp1.params." +
TransportConstants.KEYSTORE_PATH_PROP_NAME, keyStoreToReload.getAbsolutePath());
+ properties.put("acceptorConfigurations.tcp1.params." +
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "securepass");
+
+ Writer propertiesWriter = Files.newBufferedWriter(brokerProperties,
StandardOpenOption.WRITE,
+
StandardOpenOption.CREATE,
+
StandardOpenOption.TRUNCATE_EXISTING);
+ try {
+ properties.store(propertiesWriter, null);
+ } finally {
+ propertiesWriter.flush();
+ propertiesWriter.close();
+ }
+
+ final MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
+ runAfter(() -> MBeanServerFactory.releaseMBeanServer(mBeanServer));
+
+ EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
+ embeddedActiveMQ.setPropertiesResourcePath(brokerProperties.toString());
+ embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
+ embeddedActiveMQ.setMbeanServer(mBeanServer);
+ embeddedActiveMQ.start();
+
+ final ReusableLatch latch = new ReusableLatch(1);
+ final Runnable tick = latch::countDown;
+
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+
+ try {
+ latch.await(10, TimeUnit.SECONDS);
+
+ final TransportConfiguration acceptor = findInConfiguration("tcp1",
embeddedActiveMQ.getActiveMQServer().getConfiguration());
+
+ assertNotNull(acceptor);
+ assertEquals("127.0.0.1",
acceptor.getParams().get(TransportConstants.HOST_PROP_NAME));
+ assertEquals("61617",
acceptor.getParams().get(TransportConstants.PORT_PROP_NAME));
+
+ String url =
"tcp://127.0.0.1:61616?sslEnabled=true;trustStorePath=server-ca-truststore.p12;trustStorePassword=securepass";
+ ServerLocator locator =
addServerLocator(ActiveMQClient.createServerLocator(url)).setCallTimeout(3000);
+
+ try {
+ createSessionFactory(locator);
+ fail("Creating session here should fail due to SSL handshake
problems.");
+ } catch (Exception ignored) {
+ }
+
+ // Update the broker properties file with a new acceptor on the
correct port to trigger configuration
+ // reload. This should retain the ability to watch the TLS resources
for updates and after the reload
+ // the test will update the key store to one that allows the client
to connect and we should see the
+ // client succeed eventually.
+ properties.clear();
+ properties.put("acceptorConfigurations.tcp2.factoryClassName",
NETTY_ACCEPTOR_FACTORY);
+ properties.put("acceptorConfigurations.tcp2.params.host",
"127.0.0.1");
+ properties.put("acceptorConfigurations.tcp2.params.port", "61616");
+ properties.put("acceptorConfigurations.tcp2.params." +
TransportConstants.SSL_AUTO_RELOAD_PROP_NAME, "true");
+ properties.put("acceptorConfigurations.tcp2.params." +
TransportConstants.SSL_ENABLED_PROP_NAME, "true");
+ properties.put("acceptorConfigurations.tcp2.params." +
TransportConstants.KEYSTORE_PATH_PROP_NAME, keyStoreToReload.getAbsolutePath());
+ properties.put("acceptorConfigurations.tcp2.params." +
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "securepass");
+
+ propertiesWriter = Files.newBufferedWriter(brokerProperties,
StandardOpenOption.WRITE,
+
StandardOpenOption.TRUNCATE_EXISTING);
+ try {
+ properties.store(propertiesWriter, null);
+ } finally {
+ propertiesWriter.flush();
+ propertiesWriter.close();
+ }
+
+ latch.setCount(1);
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+ latch.await(10, TimeUnit.SECONDS);
+
+ // Old acceptor should be null but new one should be present and
should be watching for updates
+ assertNull(findInConfiguration("tcp1",
embeddedActiveMQ.getActiveMQServer().getConfiguration()));
+ final TransportConfiguration updatedAcceptor =
findInConfiguration("tcp2",
embeddedActiveMQ.getActiveMQServer().getConfiguration());
+
+ assertNotNull(updatedAcceptor);
+ assertEquals("127.0.0.1",
updatedAcceptor.getParams().get(TransportConstants.HOST_PROP_NAME));
+ assertEquals("61616",
updatedAcceptor.getParams().get(TransportConstants.PORT_PROP_NAME));
+
+ // update the server side key store with one that actually works with
the client trust store
+ copyRecursive(new
File(this.getClass().getClassLoader().getResource("server-keystore.p12").getFile()),
keyStoreToReload);
+
+ // expect success after auto reload, which we wait for
+ Wait.assertTrue(() -> {
+ try {
+ addSessionFactory(createSessionFactory(locator));
+ return true;
+ } catch (Throwable ignored) {
+ }
+ return false;
+ }, 5000, 100);
+ } finally {
+ embeddedActiveMQ.stop();
+ }
+ }
+
+ private TransportConfiguration findInConfiguration(String acceptorName,
Configuration configuration) {
+ final Set<TransportConfiguration> acceptors =
configuration.getAcceptorConfigurations();
+
+ for (TransportConfiguration acceptorConfig : acceptors) {
+ if (acceptorName.equals(acceptorConfig.getName())) {
+ return acceptorConfig;
+ }
+ }
+
+ return null;
+ }
+
private AddressSettings getAddressSettings(EmbeddedActiveMQ
embeddedActiveMQ, String address) {
return
embeddedActiveMQ.getActiveMQServer().getAddressSettingsRepository().getMatch(address);
}
diff --git
a/tests/integration-tests/src/test/resources/reload-acceptor-add-new.xml
b/tests/integration-tests/src/test/resources/reload-acceptor-add-new.xml
new file mode 100644
index 0000000000..cd5d04756f
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-acceptor-add-new.xml
@@ -0,0 +1,37 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core">
+
+
<configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+ <persistence-enabled>false</persistence-enabled>
+ <security-enabled>false</security-enabled>
+
+ <acceptors>
+ <acceptor name="artemis1">tcp://127.0.0.2:61617</acceptor>
+ <acceptor
name="artemis2">tcp://127.0.0.3:61618?autoStart=false</acceptor>
+ </acceptors>
+ </core>
+</configuration>
diff --git
a/tests/integration-tests/src/test/resources/reload-acceptor-updated.xml
b/tests/integration-tests/src/test/resources/reload-acceptor-updated.xml
new file mode 100644
index 0000000000..5d0c612e1b
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-acceptor-updated.xml
@@ -0,0 +1,36 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core">
+
+
<configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+ <persistence-enabled>false</persistence-enabled>
+ <security-enabled>false</security-enabled>
+
+ <acceptors>
+ <acceptor
name="artemis">tcp://127.0.0.1:61616?autoStart=false</acceptor>
+ </acceptors>
+ </core>
+</configuration>
diff --git a/tests/integration-tests/src/test/resources/reload-acceptor.xml
b/tests/integration-tests/src/test/resources/reload-acceptor.xml
new file mode 100644
index 0000000000..4e8952e4df
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-acceptor.xml
@@ -0,0 +1,36 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core">
+
+
<configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+ <persistence-enabled>false</persistence-enabled>
+ <security-enabled>false</security-enabled>
+
+ <acceptors>
+ <acceptor name="artemis">tcp://127.0.0.1:61616</acceptor>
+ </acceptors>
+ </core>
+</configuration>
diff --git
a/tests/integration-tests/src/test/resources/reload-no-acceptor-config.xml
b/tests/integration-tests/src/test/resources/reload-no-acceptor-config.xml
new file mode 100644
index 0000000000..a2611bc8f9
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-no-acceptor-config.xml
@@ -0,0 +1,33 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core">
+
+
<configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+ <persistence-enabled>false</persistence-enabled>
+ <security-enabled>false</security-enabled>
+
+ </core>
+</configuration>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact