arnoudja opened a new pull request, #1851:
URL: https://github.com/apache/activemq/pull/1851

   We use a network of brokers setup. On all servers, OpenWire is used by 
subscribers to receive text messages from topics.
   
   When a text message is posted on a topic on one of the servers using AMQP, 
there is a race condition. Roughly 1% of the messages are read as empty 
messages on another server.
   
   I'm not an expert on the activemq architecture, nor on its use cases, nor on 
Java, so please help me out when I'm making wrong conclusions. But with the 
help of codex, for me it looks like ActiveMQTextMessage had too much 
optimization. As far as I understood, it's not supposed to be used by multiple 
threads concurrently but it is. As a result, when copy() and beforeMarshall() 
are called roughly at the same time, the copy ends up completely empty. I'll 
add the analysis below.
   
   A different but related pull request is this: 
https://github.com/apache/activemq/pull/1659/changes
   
   I've looked at 4 solutions, all with their own downside. In the end, this 
pull request looks like the best compromise to me.
   
   1) Add synchronisation to ActiveMQTextMessage
   This is not according to the architecture, is not in line with other classes 
like ActiveMQMapMessage and will create more overhead. Besides that, it will 
not solve all problems: The beforeMarshall - getText - continue with the 
marshall scenario will still fail.
   
   2) Swap the order in copy() so text is copied first instead of last
   Though this will probably have the least impact, it feels like a nasty 
solution. It would need a lot of explanation around that code to avoid 
regression after future changes. Also, it wouldn't solve the problems 
completely.
   
   3) Avoid the concurrent use of the same instance by performing a copy before 
calling beforeMarshall
   This looks like the best option to me long term, but would have a lot of 
impact. High risk on creating other bugs and performance issues even in 
usecases where text messages aren't used at all.
   
   4) Remove the over-optimization by not clearing text / content when the 
other is filled
   Not a perfect solution and will lead to higher memory usage, but it is in 
line with ActiveMQMapMessage and ActiveMQObjectMessage. Implemented in this 
pull request
   
   The analysis of the bug was done on an older branch, but as far as I can see 
it is still not solved:
   
   The race originated because the broker dispatched the same 
ActiveMQTextMessage instance to two different subscribers at roughly the same 
time:
   
     - the local OpenWire consumer
     - the local side of the network bridge
   
     You can see that the broker puts the same message object into each 
MessageDispatch, not a copy, in 
activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java:644
 and
     
activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java:782.
   
     The two methods racing
   
     1. ActiveMQTextMessage.copy()
     2. ActiveMQTextMessage.beforeMarshall()
   
     beforeMarshall() is the mutating path. In 
activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java:123,
 it calls storeContentAndClear(), which:
   
     - serializes text into content
     - then sets text = null
   
     The copy() implementation read those two pieces of body state separately:
   
     - super.copy(copy) copied content
     - then copy.text = text
   
     That gave a race window between those two reads.
   
     Where the two calls came from
   
     - copy() came from the network bridge forwarding path:
       
activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java:1189
       -> 
activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java:1172
       -> md.getMessage().copy()
     - beforeMarshall() came from the local OpenWire consumer delivery path:
       topic/prefetch subscription dispatch
       -> 
activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java:936
       -> 
activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java:971
       -> 
activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java:1482
       -> OpenWire marshal
       -> 
activemq-client/src/main/java/org/apache/activemq/openwire/v12/MessageMarshaller.java:118
 or looseMarshal()
       -> info.beforeMarshall(wireFormat)
   
     The network bridge gets its commands from the local transport listener in 
activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java:214,
 which calls
     serviceLocalCommand(command).
   
     The failing interleaving
     The bad sequence was:
   
     1. Network bridge thread enters copy()
     2. super.copy(copy) sees content == null, so copied message gets no content
     3. Local OpenWire transport thread enters beforeMarshall()
     4. storeContentAndClear() serializes body and then sets source text = null
     5. Network bridge thread resumes and does copy.text = text
     6. It now reads text == null
   
     Result: the forwarded copy ended with both content == null and text == 
null, so the remote OpenWire consumer received a TextMessage with a null body.
   
   
   During the analysis I also found another possible bug. I didn't try to 
reproduce it yet:
   
    - a topic has one normal OpenWire consumer
     - the same topic also has another consumer using an XPath selector
     - the normal consumer is using async dispatch, which is the default on 
ActiveMQConnectionFactory 
(activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java:126)
   
     Why this is realistic:
   
     - async consumer dispatch is normal
     - XPath selectors are a built-in broker feature, not a test-only trick
   
     How the overlap happens
   
     During topic fanout, the broker iterates subscriptions in 
activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java:37:
   
     1. For subscription A, sub.add(node) is called.
     2. In 
activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java:96,
 if the subscription can take the message immediately, it calls dispatch(node) 
(activemq-broker/
        
src/main/java/org/apache/activemq/broker/region/TopicSubscription.java:113).
     3. That creates a MessageDispatch holding the shared Message instance and 
queues it to the connection with 
activemq-broker/src/main/java/org/apache/activemq/broker/region/
        TopicSubscription.java:700.
   
     At that point the transport/task-runner thread for consumer A can start 
marshalling the message, which eventually leads to beforeMarshall().
   
     Meanwhile, the broker dispatch thread is still inside the same fanout loop 
and moves on to subscription B:
   
     4. sub.matches(node, msgContext) is called in 
activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java:43.
     5. That goes through 
activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java:101.
     6. If B uses an XPath selector, 
activemq-client/src/main/java/org/apache/activemq/filter/XPathExpression.java:148
 calls the evaluator on message.getMessage().
     7. The broker-side XPath evaluator then does ((TextMessage) 
message).getText() in 
activemq-broker/src/main/java/org/apache/activemq/filter/XalanXPathEvaluator.java:49.
   
     So you can get this real overlap:
   
     - Thread 1: local OpenWire transport path calls beforeMarshall()
     - Thread 2: broker dispatch thread evaluates an XPath selector and calls 
getText()
   
     And both operate on the same shared ActiveMQTextMessage instance.
   
     So the scenario requires:
   
     - topic fanout to multiple subscriptions
     - async dispatch on one subscription
     - an XPath selector on another subscription
   
     That is less common than ordinary selectors, but it is absolutely a 
supported runtime path, not just a theoretical one.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to