This is an automated email from the ASF dual-hosted git repository.

havret pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git


The following commit(s) were added to refs/heads/main by this push:
     new c96b0ac  AMQNET-846 Add option to customize acknowledgment behavior 
for expired messages via IRedeliveryPolicy
c96b0ac is described below

commit c96b0ac53acda38b5d3e7cc07c3bab507e961431
Author: Havret <[email protected]>
AuthorDate: Sat Jun 7 00:05:39 2025 +0200

    AMQNET-846 Add option to customize acknowledgment behavior for expired 
messages via IRedeliveryPolicy
---
 src/NMS.AMQP/Apache-NMS-AMQP.csproj                |   2 +-
 src/NMS.AMQP/NmsMessageConsumer.cs                 |  29 ++-
 src/NMS.AMQP/Policies/DefaultRedeliveryPolicy.cs   |  28 +++
 src/NMS.AMQP/Policies/RedeliveryPolicy.cs          |  28 ---
 .../Integration/ConsumerIntegrationTest.cs         | 273 +++++++++++++++++++++
 test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs |   7 +
 6 files changed, 336 insertions(+), 31 deletions(-)

diff --git a/src/NMS.AMQP/Apache-NMS-AMQP.csproj 
b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
index aa03182..e4335a1 100644
--- a/src/NMS.AMQP/Apache-NMS-AMQP.csproj
+++ b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
@@ -93,7 +93,7 @@ with the License.  You may obtain a copy of the License at
     <ItemGroup>
         <!-- AMQPNetLite.Core is .NET Standard 1.3 package -->
         <PackageReference Include="AMQPNetLite.Core" Version="2.4.3" />
-        <PackageReference Include="Apache.NMS" Version="2.1.0" />
+        <PackageReference Include="Apache.NMS" Version="2.2.0" />
         <PackageReference Include="System.Threading.Tasks.Dataflow" 
Version="4.9.0" />
     </ItemGroup>
 </Project>
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs 
b/src/NMS.AMQP/NmsMessageConsumer.cs
index d37b55a..73206cd 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -544,7 +544,6 @@ namespace Apache.NMS.AMQP
                             Tracer.Debug($"{Info.Id} filtered message with 
excessive redelivery count: {envelope.RedeliveryCount.ToString()}");
                         }
 
-                        // TODO: Apply redelivery policy
                         await DoAckExpiredAsync(envelope).Await();
                     }
                     else
@@ -602,7 +601,33 @@ namespace Apache.NMS.AMQP
 
         private Task DoAckExpiredAsync(InboundMessageDispatch envelope)
         {
-            return 
Session.AcknowledgeAsync(AckType.MODIFIED_FAILED_UNDELIVERABLE, envelope);
+            if (Session.Connection.RedeliveryPolicy != null)
+            {
+                var dispositionType = 
Session.Connection.RedeliveryPolicy.GetOutcome(envelope.Message.NMSDestination);
+                var ackType = LookupAckTypeForDisposition(dispositionType);
+                return Session.AcknowledgeAsync(ackType, envelope);
+            }
+            else
+            {
+                return 
Session.AcknowledgeAsync(AckType.MODIFIED_FAILED_UNDELIVERABLE, envelope);
+            }
+        }
+
+        private static AckType LookupAckTypeForDisposition(int dispositionType)
+        {
+            var ackType = (AckType) dispositionType;
+
+            switch (ackType)
+            {
+                case AckType.ACCEPTED:
+                case AckType.REJECTED:
+                case AckType.RELEASED:
+                case AckType.MODIFIED_FAILED:
+                case AckType.MODIFIED_FAILED_UNDELIVERABLE:
+                    return ackType;
+                default:
+                    throw new 
ArgumentOutOfRangeException(nameof(dispositionType), "Unknown disposition 
type");
+            }
         }
 
         private void SetAcknowledgeCallback(InboundMessageDispatch envelope)
diff --git a/src/NMS.AMQP/Policies/DefaultRedeliveryPolicy.cs 
b/src/NMS.AMQP/Policies/DefaultRedeliveryPolicy.cs
index e69de29..407559c 100644
--- a/src/NMS.AMQP/Policies/DefaultRedeliveryPolicy.cs
+++ b/src/NMS.AMQP/Policies/DefaultRedeliveryPolicy.cs
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS.AMQP.Message;
+
+namespace Apache.NMS.AMQP.Policies;
+
+public class DefaultRedeliveryPolicy : Apache.NMS.Policies.RedeliveryPolicy
+{
+    public override int GetOutcome(IDestination destination)
+    {
+        return (int) AckType.MODIFIED_FAILED_UNDELIVERABLE;
+    }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Policies/RedeliveryPolicy.cs 
b/src/NMS.AMQP/Policies/RedeliveryPolicy.cs
deleted file mode 100644
index 28a5355..0000000
--- a/src/NMS.AMQP/Policies/RedeliveryPolicy.cs
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace Apache.NMS.AMQP.Policies
-{
-    class RedeliveryPolicy
-    {
-    }
-}
diff --git a/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs 
b/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
index 6bfb1f5..b5af7de 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
@@ -23,6 +23,7 @@ using Amqp.Framing;
 using Apache.NMS;
 using Apache.NMS.AMQP;
 using Apache.NMS.AMQP.Message;
+using Apache.NMS.AMQP.Policies;
 using Apache.NMS.AMQP.Util;
 using Moq;
 using NMS.AMQP.Test.TestAmqp;
@@ -1097,5 +1098,277 @@ namespace NMS.AMQP.Test.Integration
                 testPeer.WaitForAllMatchersToComplete(2000);
             }
         }
+
+        [Test, Timeout(20_000)]
+        public void TestMessageNackedWhenRedeliveryCountExceeded()
+        {
+            using (var testPeer = new TestAmqpPeer())
+            {
+                var connection = EstablishConnection(testPeer);
+                connection.RedeliveryPolicy = new DefaultRedeliveryPolicy { 
MaximumRedeliveries = 1 };
+                connection.Start();
+
+                testPeer.ExpectBegin();
+
+                var session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                var queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new 
Amqp.Message { BodySection = new AmqpValue { Value = "hello" } });
+                testPeer.ExpectDispositionThatIsModifiedFailedAndSettled();
+
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+
+                IMessage m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                // Verify the message is no longer there. Will drain to be 
sure there are no messages.
+                Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should 
not have been received");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestMessageAcceptedWhenRedeliveryCountExceeded()
+        {
+            using (var testPeer = new TestAmqpPeer())
+            {
+                var connection = EstablishConnection(testPeer);
+                connection.RedeliveryPolicy = new CustomRedeliveryPolicy { 
MaximumRedeliveries = 1, Outcome = 0 };
+                connection.Start();
+
+                testPeer.ExpectBegin();
+
+                var session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                var queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new 
Amqp.Message { BodySection = new AmqpValue { Value = "hello" } });
+                testPeer.ExpectDisposition(settled: true, state =>
+                {
+                    Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code);
+                });
+
+                var consumer = session.CreateConsumer(queue);
+
+                var m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                // Verify the message is no longer there. Will drain to be 
sure there are no messages.
+                Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should 
not have been received");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public void TestMessageRejectedWhenRedeliveryCountExceeded()
+        {
+            using (var testPeer = new TestAmqpPeer())
+            {
+                var connection = EstablishConnection(testPeer);
+                connection.RedeliveryPolicy = new CustomRedeliveryPolicy { 
MaximumRedeliveries = 1, Outcome = 1 };
+                connection.Start();
+
+                testPeer.ExpectBegin();
+
+                var session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                var queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new 
Amqp.Message { BodySection = new AmqpValue { Value = "hello" } });
+                testPeer.ExpectDisposition(settled: true, state =>
+                {
+                    Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.REJECTED_INSTANCE.Descriptor.Code);
+                });
+
+                var consumer = session.CreateConsumer(queue);
+
+                var m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                // Verify the message is no longer there. Will drain to be 
sure there are no messages.
+                Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should 
not have been received");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public void TestMessageReleasedWhenRedeliveryCountExceeded()
+        {
+            using (var testPeer = new TestAmqpPeer())
+            {
+                var connection = EstablishConnection(testPeer);
+                connection.RedeliveryPolicy = new CustomRedeliveryPolicy { 
MaximumRedeliveries = 1, Outcome = 2 };
+                connection.Start();
+
+                testPeer.ExpectBegin();
+
+                var session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                var queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new 
Amqp.Message { BodySection = new AmqpValue { Value = "hello" } });
+                testPeer.ExpectDisposition(settled: true, state =>
+                {
+                    Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.RELEASED_INSTANCE.Descriptor.Code);
+                });
+
+                var consumer = session.CreateConsumer(queue);
+
+                var m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                // Verify the message is no longer there. Will drain to be 
sure there are no messages.
+                Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should 
not have been received");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public void TestMessageModifiedFailedWhenRedeliveryCountExceeded()
+        {
+            using (var testPeer = new TestAmqpPeer())
+            {
+                var connection = EstablishConnection(testPeer);
+                connection.RedeliveryPolicy = new CustomRedeliveryPolicy { 
MaximumRedeliveries = 1, Outcome = 3 };
+                connection.Start();
+
+                testPeer.ExpectBegin();
+
+                var session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                var queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new 
Amqp.Message { BodySection = new AmqpValue { Value = "hello" } });
+                testPeer.ExpectDisposition(settled: true, state =>
+                {
+                    Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.MODIFIED_INSTANCE.Descriptor.Code);
+                });
+
+                var consumer = session.CreateConsumer(queue);
+
+                var m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                // Verify the message is no longer there. Will drain to be 
sure there are no messages.
+                Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should 
not have been received");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public void 
TestMessageModifiedFailedUndeliverableWhenRedeliveryCountExceeded()
+        {
+            using (var testPeer = new TestAmqpPeer())
+            {
+                var connection = EstablishConnection(testPeer);
+                connection.RedeliveryPolicy = new CustomRedeliveryPolicy { 
MaximumRedeliveries = 1, Outcome = 4 };
+                connection.Start();
+
+                testPeer.ExpectBegin();
+
+                var session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                var queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new 
Amqp.Message { BodySection = new AmqpValue { Value = "hello" } });
+                testPeer.ExpectDisposition(settled: true, state =>
+                {
+                    Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.MODIFIED_INSTANCE.Descriptor.Code);
+                    Assert.IsTrue(state is Modified modified && 
modified.DeliveryFailed && modified.UndeliverableHere);
+                });
+
+                var consumer = session.CreateConsumer(queue);
+
+                var m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                // Verify the message is no longer there. Will drain to be 
sure there are no messages.
+                Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should 
not have been received");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+
+        private class CustomRedeliveryPolicy : 
Apache.NMS.Policies.RedeliveryPolicy
+        {
+            public int Outcome { get; set; }
+            
+            public override int GetOutcome(IDestination destination)
+            {
+                return Outcome;
+            }
+        }
     }
 }
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs 
b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index e61cc88..d59329e 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -726,6 +726,13 @@ namespace NMS.AMQP.Test.TestAmqp
 
             ExpectDisposition(settled: true, stateMatcher: stateMatcher);
         }
+        
+        public void ExpectDispositionThatIsModifiedFailedAndSettled()
+        {
+            Action<DeliveryState> stateMatcher = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.MODIFIED_FAILED_INSTANCE.Descriptor.Code); };
+            
+            ExpectDisposition(settled: true, stateMatcher: stateMatcher);
+        }
 
         public void ExpectDisposition(bool settled, Action<DeliveryState> 
stateMatcher, uint? firstDeliveryId = null, uint? lastDeliveryId = null)
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to