This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 48da8546b4 ARTEMIS-5982 Fix start Ordering between Acceptor and
Mirroring
48da8546b4 is described below
commit 48da8546b45dd8cd49c166e248f06f09face17a3
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue Mar 31 17:01:39 2026 -0400
ARTEMIS-5982 Fix start Ordering between Acceptor and Mirroring
---
.../amqp/connect/AMQPBrokerConnectionManager.java | 2 +-
.../core/remoting/impl/netty/NettyAcceptor.java | 12 +-
.../connect/LockCoordinatorStartOrderTest.java | 256 +++++++++++++++++++++
3 files changed, 264 insertions(+), 6 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
index 95a2284e38..f4d4ad5704 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
@@ -93,7 +93,7 @@ public class AMQPBrokerConnectionManager implements
ActiveMQComponent, ClientCon
if (configuration.getLockCoordinator() != null) {
LockCoordinator lockCoordinator =
server.getLockCoordinator(configuration.getLockCoordinator());
if (lockCoordinator == null) {
- throw new IllegalStateException("lock coordinator " +
configuration.getName() + " not found");
+ throw new IllegalStateException("lock coordinator " +
configuration.getLockCoordinator() + " not found");
}
amqpBrokerConnection.setLockCoordinator(lockCoordinator);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index e4bc1ae8aa..541ddbd385 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -468,14 +468,16 @@ public class NettyAcceptor extends AbstractAcceptor {
if (lockCoordinator == null) {
internalStart();
} else {
- // The Acceptor needs to start before anything else, so low priority
to start
- lockCoordinator.onLockAcquired(this::internalStart,
LockCoordinator.LOW_PRIORITY);
- // And the Acceptor needs to stop after everything else, so high
priority to stop
- lockCoordinator.onLockReleased(this::internalStop,
LockCoordinator.HIGH_PRIORITY);
+ // The acceptor needs to be the last to start
+ // Mirroring and other components that may be capturing events need
to be started before the acceptor
+ lockCoordinator.onLockAcquired(this::internalStart,
LockCoordinator.HIGH_PRIORITY);
+ // And the Acceptor needs to stop before everything else, so low
priority to stop
+ // This is because we need to stop capturing events to avoid missing
events
+ lockCoordinator.onLockReleased(this::internalStop,
LockCoordinator.LOW_PRIORITY);
}
}
- private void internalStart() throws Exception {
+ protected void internalStart() throws Exception {
if (channelClazz != null) {
// Already started
return;
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/LockCoordinatorStartOrderTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/LockCoordinatorStartOrderTest.java
new file mode 100644
index 0000000000..bee450b96d
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/LockCoordinatorStartOrderTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.LockCoordinatorConfiguration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
+import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
+import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
+import
org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
+import
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutor;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LockCoordinatorStartOrderTest extends AmqpClientTestSupport {
+
+ protected static final int AMQP_PORT_2 = 5673;
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ ActiveMQServer server_2;
+ private AssertionLoggerHandler loggerHandler;
+
+ @AfterEach
+ public void stopServer1() throws Exception {
+ if (server != null) {
+ server.stop();
+ }
+ }
+
+ @AfterEach
+ public void stopServer2() throws Exception {
+ if (server_2 != null) {
+ server_2.stop();
+ }
+ }
+
+ @BeforeEach
+ public void startLogging() {
+ loggerHandler = new AssertionLoggerHandler();
+
+ }
+
+ @AfterEach
+ public void stopLogging() throws Exception {
+ try {
+ assertFalse(loggerHandler.findText("AMQ222214"));
+ } finally {
+ loggerHandler.close();
+ }
+ }
+
+ @Override
+ protected ActiveMQServer createServer() throws Exception {
+ return createServer(AMQP_PORT, false);
+ }
+
+ @Test
+ public void testValidateAcceptorStartOrder() throws Exception {
+ String queueName = getQueueName() + RandomUtil.randomUUIDString();
+
+ server.setIdentity("Server1");
+ {
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
+ amqpConnection.setLockCoordinator("theLock");
+ amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true).setQueueCreation(true));
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+
+ HashMap<String, Object> params = new HashMap();
+
params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
60_000);
+ TransportConfiguration transportConfiguration = new
TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
+ transportConfiguration.setLockCoordinator("theLock");
+ transportConfiguration.setName("locked");
+
server.getConfiguration().addAcceptorConfiguration(transportConfiguration);
+ HashMap<String, String> properties = new HashMap<>();
+
+ properties.put("locks-folder", getTemporaryDir());
+ LockCoordinatorConfiguration lockCoordinatorConfiguration = new
LockCoordinatorConfiguration(properties);
+
lockCoordinatorConfiguration.setName("theLock").setClassName("org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager").setCheckPeriod(100).setLockId("theLock");
+
server.getConfiguration().addLockCoordinatorConfiguration(lockCoordinatorConfiguration);
+
server.getConfiguration().addQueueConfiguration(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
+ }
+ server.start();
+
+ server_2 = createServer(AMQP_PORT_2, false);
+ server_2.setIdentity("Server2");
+
+ {
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100);
+ amqpConnection.setLockCoordinator("theLock");
+ amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
+ server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+ HashMap<String, Object> params = new HashMap();
+
params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
60_000);
+ TransportConfiguration transportConfiguration = new
TransportConfiguration(LockCoordinatorNettyFactory.class.getName(), params);
+ transportConfiguration.setLockCoordinator("theLock");
+ transportConfiguration.setName("locked");
+
server_2.getConfiguration().addAcceptorConfiguration(transportConfiguration);
+
+ HashMap<String, String> properties = new HashMap<>();
+ properties.put("locks-folder", getTemporaryDir());
+ LockCoordinatorConfiguration lockCoordinatorConfiguration = new
LockCoordinatorConfiguration(properties);
+
lockCoordinatorConfiguration.setName("theLock").setClassName("org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager").setCheckPeriod(100).setLockId("theLock");
+
server_2.getConfiguration().addLockCoordinatorConfiguration(lockCoordinatorConfiguration);
+
server_2.getConfiguration().addQueueConfiguration(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
+ }
+
+ server_2.start();
+
+ Wait.assertNotNull(() -> server_2.locateQueue(queueName), 5000, 100);
+
+ CountDownLatch started = new CountDownLatch(1);
+ CountDownLatch waitSend = new CountDownLatch(1);
+
+ ReplacedNettyAcceptor replacedNettyAcceptor = (ReplacedNettyAcceptor)
server_2.getRemotingService().getAcceptor("locked");
+ assertNotNull(replacedNettyAcceptor);
+ replacedNettyAcceptor.setAfterStartCallback(() -> {
+ try {
+ started.countDown();
+ waitSend.await(10, TimeUnit.SECONDS);
+ } catch (Throwable ignored) {
+ }
+ });
+
+
+ server.stop();
+
+ assertTrue(started.await(10, TimeUnit.SECONDS));
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:60000");
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer =
session.createProducer(session.createQueue(queueName));
+ producer.send(session.createTextMessage("RacedMessage"));
+ session.commit();
+ }
+
+ waitSend.countDown();
+
+ server.start();
+
+ Queue queueOnServer1 = server.locateQueue(queueName);
+
+ assertNotNull(queueOnServer1);
+
+ Wait.assertEquals(1L, queueOnServer1::getMessageCount, 5000, 100);
+
+ server_2.stop();
+
+ }
+
+
+ public static class LockCoordinatorNettyFactory implements AcceptorFactory {
+
+ @Override
+ public Acceptor createAcceptor(String name,
+ ClusterConnection connection,
+ Map<String, Object> configuration,
+ BufferHandler handler,
+ ServerConnectionLifeCycleListener
listener,
+ Executor threadPool,
+ ScheduledExecutorService
scheduledThreadPool,
+ Map<String, ProtocolManager> protocolMap,
+ String threadFactoryGroupName,
+ MetricsManager metricsManager) {
+ Executor failureExecutor = new OrderedExecutor(threadPool);
+ return new ReplacedNettyAcceptor(name, connection, configuration,
handler, listener, scheduledThreadPool, failureExecutor, protocolMap,
threadFactoryGroupName, metricsManager);
+ }
+ }
+
+
+ public static class ReplacedNettyAcceptor extends NettyAcceptor {
+
+ private Runnable afterStartCallback;
+
+ public ReplacedNettyAcceptor(String name,
+ ClusterConnection clusterConnection,
+ Map<String, Object> configuration,
+ BufferHandler handler,
+ ServerConnectionLifeCycleListener listener,
+ ScheduledExecutorService
scheduledThreadPool,
+ Executor failureExecutor,
+ Map<String, ProtocolManager> protocolMap,
+ String threadFactoryGroupName,
+ MetricsManager metricsManager) {
+ super(name, clusterConnection, configuration, handler, listener,
scheduledThreadPool, failureExecutor, protocolMap, threadFactoryGroupName,
metricsManager);
+ }
+
+
+ @Override
+ protected void internalStart() throws Exception {
+ super.internalStart();
+ if (afterStartCallback != null) {
+ afterStartCallback.run();
+ }
+ }
+
+ public Runnable getAfterStartCallback() {
+ return afterStartCallback;
+ }
+
+ public ReplacedNettyAcceptor setAfterStartCallback(Runnable
afterStartCallback) {
+ this.afterStartCallback = afterStartCallback;
+ return this;
+ }
+ }
+
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]