jeanouii commented on code in PR #1659:
URL: https://github.com/apache/activemq/pull/1659#discussion_r2794236552
##########
activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java:
##########
@@ -20,13 +20,16 @@
import java.io.InterruptedIOException;
import java.net.URI;
import java.security.cert.X509Certificate;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.sql.PreparedStatement;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
Review Comment:
This is discouraged in the project, even though it's not a blocker
##########
activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java:
##########
@@ -94,6 +97,30 @@ public void oneway(Object command) throws IOException {
throw new TransportDisposedIOException("Peer (" +
peer.toString() + ") disposed.");
}
+ // Deep copy the message if it is a MessageDispatch
+ Object toSend = command;
+ if (command instanceof MessageDispatch) {
+ MessageDispatch original = (MessageDispatch) command;
+ try {
+ WireFormat wf = new OpenWireFormat();
+ ByteSequence data = wf.marshal(original);
+ toSend = wf.unmarshal(data); // deep copy
+ } catch (IOException e) {
+ LOG.warn("Failed to deep copy MessageDispatch, sending
original", e);
+ toSend = command;
+ }
+ } else if (command instanceof ActiveMQMessage) {
+ ActiveMQMessage original = (ActiveMQMessage) command;
+ try {
+ WireFormat wf = new OpenWireFormat();
+ ByteSequence data = wf.marshal(original);
+ toSend = (ActiveMQMessage) wf.unmarshal(data);
+ } catch (IOException e) {
+ LOG.warn("Failed to marshal/unmarshal ActiveMQMessage,
sending original", e);
Review Comment:
Same
##########
activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java:
##########
@@ -124,6 +151,10 @@ public void oneway(Object command) throws IOException {
return;
}
}
+
+ // Dispatch to listener
+ dispatch(peer, peer.messageQueue, toSend);
Review Comment:
Should we have a return after this one to avoid the second dispatch bellow?
##########
activemq-unit-tests/src/test/java/org/apache/activemq/VmTransportBrokerRestartTest.java:
##########
Review Comment:
This PR should be rebased against apache/main to avoid this class to be here
##########
activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java:
##########
@@ -94,6 +97,30 @@ public void oneway(Object command) throws IOException {
throw new TransportDisposedIOException("Peer (" +
peer.toString() + ") disposed.");
}
+ // Deep copy the message if it is a MessageDispatch
+ Object toSend = command;
+ if (command instanceof MessageDispatch) {
+ MessageDispatch original = (MessageDispatch) command;
+ try {
+ WireFormat wf = new OpenWireFormat();
+ ByteSequence data = wf.marshal(original);
+ toSend = wf.unmarshal(data); // deep copy
+ } catch (IOException e) {
+ LOG.warn("Failed to deep copy MessageDispatch, sending
original", e);
+ toSend = command;
Review Comment:
I'm wondering if it's desire or not to be honest. The goal to me of this PR
(and it's great) is to have VM transport behave the same as other remote
transport. Benefit being that others in the same JVM can't mutate the message.
Great!
Now if we can't serialize/de-serialize to create a deep copy and we still
send the original, we might introduce a case where VM does work when remote
does not. So I'm tempted to just fail here. What do you think?
##########
activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java:
##########
@@ -68,6 +72,9 @@ public class VMTransport implements Transport, Task {
private volatile int receiveCounter;
+ private final List<TransportListener> listeners = new
CopyOnWriteArrayList<>();
+ private final ExecutorService executor = Executors.newCachedThreadPool();
Review Comment:
I'm probably blind or my search/replace does not work properly. Where are
the 2 fields used so far?
##########
activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java:
##########
@@ -20,13 +20,16 @@
import java.io.InterruptedIOException;
import java.net.URI;
import java.security.cert.X509Certificate;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.sql.PreparedStatement;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.activemq.command.ShutdownInfo;
+import jakarta.jms.JMSException;
+import org.apache.activemq.command.*;
Review Comment:
Same here FYI
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact