This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 730940c423 ARTEMIS-5973 Add websocket compression option to acceptor
allowed options
730940c423 is described below
commit 730940c4239d1d9afab66bc120f26b5b0716f2b9
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Mar 26 10:58:44 2026 -0400
ARTEMIS-5973 Add websocket compression option to acceptor allowed options
The configuration option needs to be in the defined set of options an
acceptor
can have on the URI in order for it to be in parameters Map to be enabled on
start of the acceptor itself.
---
.../remoting/impl/netty/TransportConstants.java | 1 +
.../config/impl/FileConfigurationParserTest.java | 35 +++++
docs/user-manual/amqp.adoc | 2 +
docs/user-manual/mqtt.adoc | 2 +
docs/user-manual/stomp.adoc | 2 +
.../amqp/AmqpWebSocketCompressionConfigTest.java | 141 +++++++++++++++++++++
.../src/test/resources/ws-compression-disabled.xml | 135 ++++++++++++++++++++
.../src/test/resources/ws-compression-enabled.xml | 135 ++++++++++++++++++++
8 files changed, 453 insertions(+)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
index 85206672a1..46d0745b33 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
@@ -483,6 +483,7 @@ public class TransportConstants {
allowableAcceptorKeys.add(TransportConstants.CONNECTIONS_ALLOWED);
allowableAcceptorKeys.add(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH);
allowableAcceptorKeys.add(TransportConstants.WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH);
+
allowableAcceptorKeys.add(TransportConstants.WEB_SOCKET_COMPRESSION_SUPPORTED);
allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword());
allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec());
allowableAcceptorKeys.add(TransportConstants.BACKLOG_PROP_NAME);
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index 6c81cd83ee..04dc87e104 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -39,6 +39,7 @@ import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
@@ -1347,4 +1348,38 @@ public class FileConfigurationParserTest extends
ServerTestBase {
AddressSettings settings = configuration.getAddressSettings().get("foo");
assertEquals(DiskFullMessagePolicy.FAIL,
settings.getDiskFullMessagePolicy());
}
+
+ @Test
+ public void testWebSocketCompressionOptionInAcceptorURI() throws Exception {
+ final String FIRST_PART = """
+ <core xmlns="urn:activemq:core">
+ <name>ActiveMQ.main.config</name>
+
<log-delegate-factory-class-name>org.apache.activemq.artemis.integration.logging.Log4jLogDelegateFactory</log-delegate-factory-class-name>
+
<bindings-directory>${jboss.server.data.dir}/activemq/bindings</bindings-directory>
+
<journal-directory>${jboss.server.data.dir}/activemq/journal</journal-directory>
+ <journal-min-files>10</journal-min-files>
+
<large-messages-directory>${jboss.server.data.dir}/activemq/largemessages</large-messages-directory>
+
<paging-directory>${jboss.server.data.dir}/activemq/paging</paging-directory>
+ <acceptors>
+ <acceptor name="ws-mqtt-test">
tcp://0.0.0.0:2994?protocols=MQTT;webSocketCompressionSupported=true</acceptor>
+ </acceptors>
+ """;
+
+ String configStr = FIRST_PART + LAST_PART;
+ FileConfigurationParser parser = new FileConfigurationParser();
+ ByteArrayInputStream input = new
ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
+
+ Configuration configuration = parser.parseMainConfig(input);
+
+ Set<TransportConfiguration> acceptors =
configuration.getAcceptorConfigurations();
+
+ assertEquals(1, acceptors.size());
+
+ TransportConfiguration acceptor = acceptors.iterator().next();
+
+ assertNotNull(acceptor);
+
+
assertTrue(acceptor.getParams().containsKey("webSocketCompressionSupported"));
+
assertFalse(acceptor.getExtraParams().containsKey("webSocketCompressionSupported"));
+ }
}
diff --git a/docs/user-manual/amqp.adoc b/docs/user-manual/amqp.adoc
index 55522f7a7e..31032c2347 100644
--- a/docs/user-manual/amqp.adoc
+++ b/docs/user-manual/amqp.adoc
@@ -207,3 +207,5 @@ AMQP over WebSockets is supported via a normal AMQP
acceptor:
With this configuration, the broker will accept AMQP connections over
WebSockets on the port `5672`.
Web browsers can then connect to `ws://<server>:5672` using a Web Socket to
send and receive AMQP messages.
+WebSockets per-message deflate is supported but not enabled by default, to
enable it add the `webSocketCompressionSupported=true` option to the acceptor
URI.
+The client must also support per-message deflate and request it via the
standard WebSockets extension.
diff --git a/docs/user-manual/mqtt.adoc b/docs/user-manual/mqtt.adoc
index 1a08c04f59..3e48ddafe6 100644
--- a/docs/user-manual/mqtt.adoc
+++ b/docs/user-manual/mqtt.adoc
@@ -189,6 +189,8 @@ SSL/TLS is also available, e.g.:
----
Web browsers can then connect to `wss://<server>:8883` using a Web Socket to
send and receive MQTT messages.
+WebSockets per-message deflate is supported but not enabled by default, to
enable it add the `webSocketCompressionSupported=true` option to the acceptor
URI.
+The client must also support per-message deflate and request it via the
standard WebSockets extension.
== Link Stealing
diff --git a/docs/user-manual/stomp.adoc b/docs/user-manual/stomp.adoc
index 8d59642ecb..6c2878ae77 100644
--- a/docs/user-manual/stomp.adoc
+++ b/docs/user-manual/stomp.adoc
@@ -264,6 +264,8 @@ STOMP over WebSockets is supported via the normal STOMP
acceptor:
With this configuration, the broker will accept STOMP connections over
WebSockets on the port `61614`.
Web browsers can then connect to `ws://<server>:61614` using a Web Socket to
send and receive STOMP messages.
+WebSockets per-message deflate is supported but not enabled by default, to
enable it add the `webSocketCompressionSupported=true` option to the acceptor
URI.
+The client must also support per-message deflate and request it via the
standard WebSockets extension.
A companion JavaScript library to ease client-side development is available
from https://github.com/jmesnil/stomp-websocket[GitHub] (please see its
http://jmesnil.net/stomp-websocket/doc/[documentation] for a complete
description).
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpWebSocketCompressionConfigTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpWebSocketCompressionConfigTest.java
new file mode 100644
index 0000000000..d7af2b3fb5
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpWebSocketCompressionConfigTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.URL;
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
+import org.apache.activemq.artemis.tests.integration.jms.RedeployTest;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClientOptions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Test connections via Web Sockets
+ */
+public class AmqpWebSocketCompressionConfigTest extends AmqpClientTestSupport {
+
+ private static final int SERVER_PORT = 5678;
+
+ final URL urlServerWSCoff =
RedeployTest.class.getClassLoader().getResource("ws-compression-disabled.xml");
+ final URL urlServerWSCon =
RedeployTest.class.getClassLoader().getResource("ws-compression-enabled.xml");
+
+ @Override
+ protected ActiveMQServer createServer() throws Exception {
+ return createServer(AMQP_PORT, false);
+ }
+
+ @Test
+ @Timeout(20)
+ public void testClientConnectsWithWebSocketCompressionOn() throws Exception
{
+ final EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
+
+ try {
+
embeddedActiveMQ.setConfigResourcePath(urlServerWSCon.toURI().toString());
+ embeddedActiveMQ.start();
+
+ testClientConnectsWithWebSockets(true);
+ } finally {
+ embeddedActiveMQ.stop();
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void testClientConnectsWithWebSocketCompressionOff() throws
Exception {
+ final EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
+
+ try {
+
embeddedActiveMQ.setConfigResourcePath(urlServerWSCoff.toURI().toString());
+ embeddedActiveMQ.start();
+
+ testClientConnectsWithWebSockets(false);
+ } finally {
+ embeddedActiveMQ.stop();
+ }
+ }
+
+ private void testClientConnectsWithWebSockets(boolean serverCompressionOn)
throws Exception {
+ final ProtonTestClientOptions clientOpts = new ProtonTestClientOptions();
+
+ clientOpts.setUseWebSockets(true);
+ clientOpts.setWebSocketCompression(true);
+
+ try (ProtonTestClient client = new ProtonTestClient(clientOpts)) {
+ client.queueClientSaslAnonymousConnect();
+ client.remoteOpen().queue();
+ client.expectOpen();
+ client.remoteBegin().queue();
+ client.expectBegin();
+ client.connect("localhost", SERVER_PORT);
+
+ client.waitForScriptToComplete(5, TimeUnit.MINUTES);
+
+ if (serverCompressionOn) {
+ assertTrue(client.isWSCompressionActive());
+ } else {
+ assertFalse(client.isWSCompressionActive());
+ }
+
+ client.expectAttach().ofSender();
+ client.expectAttach().ofReceiver();
+ client.expectFlow();
+
+ // Attach a sender and receiver
+ client.remoteAttach().ofReceiver()
+ .withName("ws-compression-test")
+ .withSource().withAddress(getQueueName())
+ .withCapabilities("queue").also()
+ .withTarget().and()
+ .now();
+ client.remoteFlow().withLinkCredit(10).now();
+ client.remoteAttach().ofSender()
+ .withInitialDeliveryCount(0)
+ .withName("ws-compression-test")
+ .withTarget().withAddress(getQueueName())
+ .withCapabilities("queue").also()
+ .withSource().and()
+ .now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final String payload = "test-data:" + "A".repeat(1000);
+
+ // Broker sends message to subscription and acknowledges to sender
+ client.expectTransfer().withMessage().withValue(payload);
+ client.expectDisposition().withSettled(true).withState().accepted();
+
+ // Client sends message to queue with subscription
+ client.remoteTransfer().withDeliveryId(0)
+ .withBody().withValue(payload).also()
+ .now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ client.expectClose();
+ client.remoteClose().now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ client.close();
+ }
+ }
+}
diff --git
a/tests/integration-tests/src/test/resources/ws-compression-disabled.xml
b/tests/integration-tests/src/test/resources/ws-compression-disabled.xml
new file mode 100644
index 0000000000..69956912b0
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/ws-compression-disabled.xml
@@ -0,0 +1,135 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:xi="http://www.w3.org/2001/XInclude"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>0.0.0.0</name>
+
+ <persistence-enabled>false</persistence-enabled>
+
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>data/paging</paging-directory>
+
+ <bindings-directory>data/bindings</bindings-directory>
+
+ <journal-directory>data/journal</journal-directory>
+
+ <large-messages-directory>data/large-messages</large-messages-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>10</journal-pool-files>
+
+ <journal-device-block-size>4096</journal-device-block-size>
+
+ <journal-file-size>10M</journal-file-size>
+
+ <journal-buffer-timeout>40000</journal-buffer-timeout>
+
+ <journal-max-io>1</journal-max-io>
+
+ <disk-scan-period>5000</disk-scan-period>
+
+ <max-disk-usage>90</max-disk-usage>
+
+ <critical-analyzer>false</critical-analyzer>
+
+ <critical-analyzer-timeout>120000</critical-analyzer-timeout>
+
+ <critical-analyzer-check-period>60000</critical-analyzer-check-period>
+
+ <critical-analyzer-policy>HALT</critical-analyzer-policy>
+
+ <security-enabled>false</security-enabled>
+
+ <page-sync-timeout>40000</page-sync-timeout>
+
+ <acceptors>
+ <acceptor
name="artemis">tcp://0.0.0.0:5678?;protocols=AMQP;webSocketCompressionSupported=false</acceptor>
+ </acceptors>
+
+ <security-settings>
+
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="amq"/>
+ <permission type="deleteNonDurableQueue" roles="amq"/>
+ <permission type="createDurableQueue" roles="amq"/>
+ <permission type="deleteDurableQueue" roles="amq"/>
+ <permission type="createAddress" roles="amq"/>
+ <permission type="deleteAddress" roles="amq"/>
+ <permission type="consume" roles="amq"/>
+ <permission type="browse" roles="amq"/>
+ <permission type="send" roles="amq"/>
+ <permission type="manage" roles="amq"/>
+ </security-setting>
+
+ </security-settings>
+
+ <address-settings>
+ <address-setting match="activemq.management.#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ" />
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue" />
+ </anycast>
+ </address>
+ <address name="testQueue">
+ <anycast>
+ <queue name="testQueue" />
+ </anycast>
+ </address>
+ </addresses>
+ </core>
+</configuration>
diff --git
a/tests/integration-tests/src/test/resources/ws-compression-enabled.xml
b/tests/integration-tests/src/test/resources/ws-compression-enabled.xml
new file mode 100644
index 0000000000..21f0c9058a
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/ws-compression-enabled.xml
@@ -0,0 +1,135 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:xi="http://www.w3.org/2001/XInclude"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>0.0.0.0</name>
+
+ <persistence-enabled>false</persistence-enabled>
+
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>data/paging</paging-directory>
+
+ <bindings-directory>data/bindings</bindings-directory>
+
+ <journal-directory>data/journal</journal-directory>
+
+ <large-messages-directory>data/large-messages</large-messages-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>10</journal-pool-files>
+
+ <journal-device-block-size>4096</journal-device-block-size>
+
+ <journal-file-size>10M</journal-file-size>
+
+ <journal-buffer-timeout>40000</journal-buffer-timeout>
+
+ <journal-max-io>1</journal-max-io>
+
+ <disk-scan-period>5000</disk-scan-period>
+
+ <max-disk-usage>90</max-disk-usage>
+
+ <critical-analyzer>false</critical-analyzer>
+
+ <critical-analyzer-timeout>120000</critical-analyzer-timeout>
+
+ <critical-analyzer-check-period>60000</critical-analyzer-check-period>
+
+ <critical-analyzer-policy>HALT</critical-analyzer-policy>
+
+ <security-enabled>false</security-enabled>
+
+ <page-sync-timeout>40000</page-sync-timeout>
+
+ <acceptors>
+ <acceptor
name="artemis">tcp://0.0.0.0:5678?;protocols=AMQP;webSocketCompressionSupported=true</acceptor>
+ </acceptors>
+
+ <security-settings>
+
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="amq"/>
+ <permission type="deleteNonDurableQueue" roles="amq"/>
+ <permission type="createDurableQueue" roles="amq"/>
+ <permission type="deleteDurableQueue" roles="amq"/>
+ <permission type="createAddress" roles="amq"/>
+ <permission type="deleteAddress" roles="amq"/>
+ <permission type="consume" roles="amq"/>
+ <permission type="browse" roles="amq"/>
+ <permission type="send" roles="amq"/>
+ <permission type="manage" roles="amq"/>
+ </security-setting>
+
+ </security-settings>
+
+ <address-settings>
+ <address-setting match="activemq.management.#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ" />
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue" />
+ </anycast>
+ </address>
+ <address name="testQueue">
+ <anycast>
+ <queue name="testQueue" />
+ </anycast>
+ </address>
+ </addresses>
+ </core>
+</configuration>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]