gemmellr commented on code in PR #4833:
URL: https://github.com/apache/activemq-artemis/pull/4833#discussion_r1505615830
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java:
##########
@@ -1562,7 +1562,7 @@ public final Object getObjectProperty(String key) {
return getAMQPUserID();
case MessageUtil.CORRELATIONID_HEADER_NAME_STRING:
if (properties != null && properties.getCorrelationId() != null) {
- return
AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId());
+ return
AMQPMessageIdHelper.INSTANCE.toCorrelationIdStringOrBytes(properties.getCorrelationId());
Review Comment:
So this is actually going to potentially break some exiting usage. Was it
decided thats ok? Plus not to offer ability to restore the prior behaviour?
##########
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java:
##########
@@ -159,7 +160,9 @@ public static org.apache.activemq.artemis.api.core.Message
inbound(final Message
coreMessage.putIntProperty(OpenWireConstants.AMQ_MSG_COMMAND_ID,
messageSend.getCommandId());
final String corrId = messageSend.getCorrelationId();
if (corrId != null) {
-
coreMessage.putStringProperty(OpenWireConstants.JMS_CORRELATION_ID_PROPERTY,
new SimpleString(corrId));
+ // this mimics what the OpenWire JMS client will do when it writes
the correlation ID before sending
+ byte[] bytes = corrId.getBytes(StandardCharsets.UTF_8);
+ coreMessage.setCorrelationID(bytes);
Review Comment:
This also seems like it is going to break a bunch of things. Stuff that got
a String before, will now get bytes/Binary instead. Even though a String is
almost certainly what was sent originally.
E.g try sending a String CorrelationID from the OpenWire JMS client and
retrieving a String CorrelationID from the AMQP JMS client. Before it would see
exactly what the original client sent, as a String. Now it will now return an
encoded binary hex since it will actually receive a Binary correlationID
instead of a String one?
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java:
##########
@@ -130,6 +130,8 @@ public String toCorrelationIdString(Object idObject) {
// It has "ID:" prefix and doesn't have encoding prefix, use it
as-is.
return stringId;
}
+ } else if (idObject instanceof Binary) {
+ return ((Binary)idObject).getArray();
Review Comment:
Strictly speaking, its possible the array isnt just the id...the Binary
should be checked that it doesnt have an array offset and is the same length as
the array.
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSCorrelationIDTest.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Test that correlation ID is handled as expected between JMS clients.
+ */
+public class JMSCorrelationIDTest extends MultiprotocolJMSClientTestSupport {
+
+ private void testCorrelationIDAsBytesSendReceive(Connection
producerConnection, Connection consumerConnection) throws Throwable {
+ Session session = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getQueueName());
+
+ byte[] bytes = new byte[0xf + 1];
+ for (int i = 0; i <= 0xf; i++) {
+ bytes[i] = (byte) i;
+ }
+
+ MessageProducer producer = session.createProducer(queue);
+ Message message = session.createMessage();
+ message.setJMSCorrelationIDAsBytes(bytes);
+ producer.send(message);
+ producer.close();
+
+ Session sessionConsumer = consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
+ final MessageConsumer consumer =
sessionConsumer.createConsumer(consumerQueue);
+
+ Message m = consumer.receive(5000);
+ Assert.assertNotNull("Could not receive message on consumer", m);
+
+ Assert.assertArrayEquals(bytes, m.getJMSCorrelationIDAsBytes());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromAMQPToAMQP() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createConnection(),
createConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromAMQPToCore() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createConnection(),
createCoreConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromAMQPToOpenWire() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createConnection(),
createOpenWireConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromCoreToCore() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createCoreConnection(),
createCoreConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromCoreToAMQP() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createCoreConnection(),
createConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromCoreToOpenWire() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createCoreConnection(),
createOpenWireConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromOpenWireToOpenWire()
throws Throwable {
+ testCorrelationIDAsBytesSendReceive(createOpenWireConnection(),
createOpenWireConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromOpenWireToAMQP() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createOpenWireConnection(),
createConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromOpenWireToCore() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createOpenWireConnection(),
createCoreConnection());
+ }
+
+ private void testCorrelationIDAsStringSendReceive(Connection
producerConnection, Connection consumerConnection) throws Throwable {
+ final String correlationId = RandomUtil.randomString();
+
+ Session session = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getQueueName());
+
+ MessageProducer producer = session.createProducer(queue);
+ Message message = session.createMessage();
+ message.setJMSCorrelationID(correlationId);
+ producer.send(message);
+ producer.close();
+
+ Session sessionConsumer = consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
+ final MessageConsumer consumer =
sessionConsumer.createConsumer(consumerQueue);
+
+ Message m = consumer.receive(5000);
+ Assert.assertNotNull("Could not receive message on consumer", m);
+
+ Assert.assertEquals(correlationId, m.getJMSCorrelationID());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromAMQPToAMQP() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createConnection(),
createConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromAMQPToCore() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createConnection(),
createCoreConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromAMQPToOpenWire() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createConnection(),
createOpenWireConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromCoreToCore() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createCoreConnection(),
createCoreConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromCoreToAMQP() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createCoreConnection(),
createConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromCoreToOpenWire() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createCoreConnection(),
createOpenWireConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromOpenWireToOpenWire()
throws Throwable {
+ testCorrelationIDAsStringSendReceive(createOpenWireConnection(),
createOpenWireConnection());
+ }
+
+ /*
+ * JMS supports setting the correlation ID as a String or a byte[].
However, OpenWire only supports correlation ID as
+ * a String. When it is set as a byte[] the OpenWire JMS client just
converts it to a UTF-8 encoded String, and
+ * therefore when it sends a JMS message with a correlation ID the broker
can't tell if the value was set as a String
+ * or a byte[]. Due to this ambiguity the broker is hard-coded to treat the
value as a byte[]. This doesn't cause any
+ * problems if the consumer is also OpenWire, but if the consumer is core
or AMQP (which both differentiate between
+ * String and binary values) then retrieving the correlation ID as a String
(i.e. via Message.getJMSCorrelationID())
+ * will fail.
+ *
+ * JMS means for the correlation ID as a byte[] to be used for "native"
clients which makes it a good candidate for
+ * interoperability between other protocols like MQTT 5 which *only*
supports correlation ID as byte[].
+ */
+ @Ignore
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromOpenWireToAMQP() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createOpenWireConnection(),
createConnection());
+ }
Review Comment:
Seems that you actually hit the interop issues I commented on from looking
at the code.
Why is the broker 'hard coded to byte[]' for Openwire when this comment
explicitly notes it effectively only does String? Why isnt it hard coded to
using String...like it was before?
If MQTT only supports byte[] then it seem like it is the MQTT stuff that
should be jumping through hoops such as converting to/from a UTF-8 bytes, not
the Openwire bits, especially as doing it this way breaks the typical+existing
Openwire<->AMQP/Core interop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]