cshannon commented on code in PR #1543:
URL: https://github.com/apache/activemq/pull/1543#discussion_r2996115073


##########
activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java:
##########
@@ -713,6 +714,144 @@ public Message receiveNoWait() throws JMSException {
         return createActiveMQMessage(md);
     }
 
+    /**
+     * Receives the next message produced for this message consumer and returns
+     * its body as an object of the specified type. This call blocks
+     * indefinitely until a message is produced or until this message consumer
+     * is closed.
+     * <p>
+     * If the message is not of a type for which the body can be assigned to
+     * the specified type, a {@code MessageFormatException} is thrown and the
+     * message is not acknowledged. It may be delivered again when a subsequent
+     * {@code receive} or {@code receiveBody} call is made.
+     *
+     * @param c the type to which the body of the next message should be
+     *          assigned
+     * @return the body of the next message, or null if this message consumer
+     *         is concurrently closed
+     * @throws MessageFormatException if the message body cannot be assigned to
+     *         the specified type
+     * @throws JMSException if the JMS provider fails to receive the next
+     *         message due to some internal error
+     */
+    public <T> T receiveBody(Class<T> c) throws JMSException {
+        checkClosed();
+        checkMessageListener();
+
+        sendPullCommand(0);
+        MessageDispatch md = dequeue(-1);
+        if (md == null) {
+            return null;
+        }
+
+        return doReceiveBody(md, c);
+    }
+
+    /**
+     * Receives the next message produced for this message consumer and returns
+     * its body as an object of the specified type, blocking up to the
+     * specified timeout. A {@code timeout} of zero never expires and the call
+     * blocks indefinitely.
+     * <p>
+     * If the message is not of a type for which the body can be assigned to
+     * the specified type, a {@code MessageFormatException} is thrown and the
+     * message is not acknowledged. It may be delivered again when a subsequent
+     * {@code receive} or {@code receiveBody} call is made.
+     *
+     * @param c       the type to which the body of the next message should be
+     *                assigned
+     * @param timeout the timeout value (in milliseconds), a timeout of zero
+     *                never expires
+     * @return the body of the next message, or null if the timeout expires or
+     *         this message consumer is concurrently closed
+     * @throws MessageFormatException if the message body cannot be assigned to
+     *         the specified type
+     * @throws JMSException if the JMS provider fails to receive the next
+     *         message due to some internal error
+     */
+    public <T> T receiveBody(Class<T> c, long timeout) throws JMSException {
+        checkClosed();
+        checkMessageListener();
+        if (timeout == 0) {
+            return this.receiveBody(c);
+        }
+
+        sendPullCommand(timeout);
+        while (timeout > 0) {
+            MessageDispatch md;
+            if (info.getPrefetchSize() == 0) {
+                md = dequeue(-1);
+            } else {
+                md = dequeue(timeout);
+            }
+
+            if (md == null) {
+                return null;
+            }
+
+            return doReceiveBody(md, c);
+        }
+        return null;
+    }
+
+    /**
+     * Receives the next message produced for this message consumer and returns
+     * its body as an object of the specified type if one is immediately
+     * available.
+     * <p>
+     * If the message is not of a type for which the body can be assigned to
+     * the specified type, a {@code MessageFormatException} is thrown and the
+     * message is not acknowledged. It may be delivered again when a subsequent
+     * {@code receive} or {@code receiveBody} call is made.
+     *
+     * @param c the type to which the body of the next message should be
+     *          assigned
+     * @return the body of the next message, or null if one is not immediately
+     *         available
+     * @throws MessageFormatException if the message body cannot be assigned to
+     *         the specified type
+     * @throws JMSException if the JMS provider fails to receive the next
+     *         message due to some internal error
+     */
+    public <T> T receiveBodyNoWait(Class<T> c) throws JMSException {
+        checkClosed();
+        checkMessageListener();
+        sendPullCommand(-1);
+
+        MessageDispatch md;
+        if (info.getPrefetchSize() == 0) {
+            md = dequeue(-1);
+        } else {
+            md = dequeue(0);
+        }
+
+        if (md == null) {
+            return null;
+        }
+
+        return doReceiveBody(md, c);
+    }
+
+    /**
+     * Checks that the message body can be assigned to the requested type,
+     * acknowledges the message, and returns its body.  If the body cannot be
+     * assigned, the message is re-enqueued without acknowledgement so that it
+     * remains available for a subsequent {@code receive} or
+     * {@code receiveBody} call.
+     */
+    private <T> T doReceiveBody(MessageDispatch md, Class<T> c) throws 
JMSException {
+        ActiveMQMessage message = createActiveMQMessage(md);
+        if (!message.isBodyAssignableTo(c)) {
+            // spec: message is not acknowledged and left for redelivery
+            unconsumedMessages.enqueueFirst(md);

Review Comment:
   @jbonofre - Did you have changes that got blown away with a force push? I 
thought at one point there was an update to handle the fact that for 
transactions and individual ack we should not be re-adding first? I'm confused 
as there has been a lot of updates and force pushes but this PR still isnt' 
handling that part of the spec and there is also the comment @tabish121 just 
made a few minutes ago about what happens if the type is Message or 
StreamMessage.
   
   At some point it might be easier to close this and open a new PR as it's 
getting hard to follow everything with all the comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to