This is an automated email from the ASF dual-hosted git repository.
havret pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git
The following commit(s) were added to refs/heads/main by this push:
new c96b0ac AMQNET-846 Add option to customize acknowledgment behavior
for expired messages via IRedeliveryPolicy
c96b0ac is described below
commit c96b0ac53acda38b5d3e7cc07c3bab507e961431
Author: Havret <[email protected]>
AuthorDate: Sat Jun 7 00:05:39 2025 +0200
AMQNET-846 Add option to customize acknowledgment behavior for expired
messages via IRedeliveryPolicy
---
src/NMS.AMQP/Apache-NMS-AMQP.csproj | 2 +-
src/NMS.AMQP/NmsMessageConsumer.cs | 29 ++-
src/NMS.AMQP/Policies/DefaultRedeliveryPolicy.cs | 28 +++
src/NMS.AMQP/Policies/RedeliveryPolicy.cs | 28 ---
.../Integration/ConsumerIntegrationTest.cs | 273 +++++++++++++++++++++
test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs | 7 +
6 files changed, 336 insertions(+), 31 deletions(-)
diff --git a/src/NMS.AMQP/Apache-NMS-AMQP.csproj
b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
index aa03182..e4335a1 100644
--- a/src/NMS.AMQP/Apache-NMS-AMQP.csproj
+++ b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
@@ -93,7 +93,7 @@ with the License. You may obtain a copy of the License at
<ItemGroup>
<!-- AMQPNetLite.Core is .NET Standard 1.3 package -->
<PackageReference Include="AMQPNetLite.Core" Version="2.4.3" />
- <PackageReference Include="Apache.NMS" Version="2.1.0" />
+ <PackageReference Include="Apache.NMS" Version="2.2.0" />
<PackageReference Include="System.Threading.Tasks.Dataflow"
Version="4.9.0" />
</ItemGroup>
</Project>
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs
b/src/NMS.AMQP/NmsMessageConsumer.cs
index d37b55a..73206cd 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -544,7 +544,6 @@ namespace Apache.NMS.AMQP
Tracer.Debug($"{Info.Id} filtered message with
excessive redelivery count: {envelope.RedeliveryCount.ToString()}");
}
- // TODO: Apply redelivery policy
await DoAckExpiredAsync(envelope).Await();
}
else
@@ -602,7 +601,33 @@ namespace Apache.NMS.AMQP
private Task DoAckExpiredAsync(InboundMessageDispatch envelope)
{
- return
Session.AcknowledgeAsync(AckType.MODIFIED_FAILED_UNDELIVERABLE, envelope);
+ if (Session.Connection.RedeliveryPolicy != null)
+ {
+ var dispositionType =
Session.Connection.RedeliveryPolicy.GetOutcome(envelope.Message.NMSDestination);
+ var ackType = LookupAckTypeForDisposition(dispositionType);
+ return Session.AcknowledgeAsync(ackType, envelope);
+ }
+ else
+ {
+ return
Session.AcknowledgeAsync(AckType.MODIFIED_FAILED_UNDELIVERABLE, envelope);
+ }
+ }
+
+ private static AckType LookupAckTypeForDisposition(int dispositionType)
+ {
+ var ackType = (AckType) dispositionType;
+
+ switch (ackType)
+ {
+ case AckType.ACCEPTED:
+ case AckType.REJECTED:
+ case AckType.RELEASED:
+ case AckType.MODIFIED_FAILED:
+ case AckType.MODIFIED_FAILED_UNDELIVERABLE:
+ return ackType;
+ default:
+ throw new
ArgumentOutOfRangeException(nameof(dispositionType), "Unknown disposition
type");
+ }
}
private void SetAcknowledgeCallback(InboundMessageDispatch envelope)
diff --git a/src/NMS.AMQP/Policies/DefaultRedeliveryPolicy.cs
b/src/NMS.AMQP/Policies/DefaultRedeliveryPolicy.cs
index e69de29..407559c 100644
--- a/src/NMS.AMQP/Policies/DefaultRedeliveryPolicy.cs
+++ b/src/NMS.AMQP/Policies/DefaultRedeliveryPolicy.cs
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS.AMQP.Message;
+
+namespace Apache.NMS.AMQP.Policies;
+
+public class DefaultRedeliveryPolicy : Apache.NMS.Policies.RedeliveryPolicy
+{
+ public override int GetOutcome(IDestination destination)
+ {
+ return (int) AckType.MODIFIED_FAILED_UNDELIVERABLE;
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Policies/RedeliveryPolicy.cs
b/src/NMS.AMQP/Policies/RedeliveryPolicy.cs
deleted file mode 100644
index 28a5355..0000000
--- a/src/NMS.AMQP/Policies/RedeliveryPolicy.cs
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace Apache.NMS.AMQP.Policies
-{
- class RedeliveryPolicy
- {
- }
-}
diff --git a/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
b/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
index 6bfb1f5..b5af7de 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
@@ -23,6 +23,7 @@ using Amqp.Framing;
using Apache.NMS;
using Apache.NMS.AMQP;
using Apache.NMS.AMQP.Message;
+using Apache.NMS.AMQP.Policies;
using Apache.NMS.AMQP.Util;
using Moq;
using NMS.AMQP.Test.TestAmqp;
@@ -1097,5 +1098,277 @@ namespace NMS.AMQP.Test.Integration
testPeer.WaitForAllMatchersToComplete(2000);
}
}
+
+ [Test, Timeout(20_000)]
+ public void TestMessageNackedWhenRedeliveryCountExceeded()
+ {
+ using (var testPeer = new TestAmqpPeer())
+ {
+ var connection = EstablishConnection(testPeer);
+ connection.RedeliveryPolicy = new DefaultRedeliveryPolicy {
MaximumRedeliveries = 1 };
+ connection.Start();
+
+ testPeer.ExpectBegin();
+
+ var session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+ var queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: new
Amqp.Message { BodySection = new AmqpValue { Value = "hello" } });
+ testPeer.ExpectDispositionThatIsModifiedFailedAndSettled();
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+
+ IMessage m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+ Assert.NotNull(m, "Message should have been received");
+ Assert.IsInstanceOf<ITextMessage>(m);
+ session.Recover();
+
+ m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+
+ Assert.NotNull(m, "Message should have been received");
+ Assert.IsInstanceOf<ITextMessage>(m);
+ session.Recover();
+
+ // Verify the message is no longer there. Will drain to be
sure there are no messages.
+ Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should
not have been received");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestMessageAcceptedWhenRedeliveryCountExceeded()
+ {
+ using (var testPeer = new TestAmqpPeer())
+ {
+ var connection = EstablishConnection(testPeer);
+ connection.RedeliveryPolicy = new CustomRedeliveryPolicy {
MaximumRedeliveries = 1, Outcome = 0 };
+ connection.Start();
+
+ testPeer.ExpectBegin();
+
+ var session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+ var queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: new
Amqp.Message { BodySection = new AmqpValue { Value = "hello" } });
+ testPeer.ExpectDisposition(settled: true, state =>
+ {
+ Assert.AreEqual(state.Descriptor.Code,
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code);
+ });
+
+ var consumer = session.CreateConsumer(queue);
+
+ var m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+ Assert.NotNull(m, "Message should have been received");
+ Assert.IsInstanceOf<ITextMessage>(m);
+ session.Recover();
+
+ m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+
+ Assert.NotNull(m, "Message should have been received");
+ Assert.IsInstanceOf<ITextMessage>(m);
+ session.Recover();
+
+ // Verify the message is no longer there. Will drain to be
sure there are no messages.
+ Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should
not have been received");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestMessageRejectedWhenRedeliveryCountExceeded()
+ {
+ using (var testPeer = new TestAmqpPeer())
+ {
+ var connection = EstablishConnection(testPeer);
+ connection.RedeliveryPolicy = new CustomRedeliveryPolicy {
MaximumRedeliveries = 1, Outcome = 1 };
+ connection.Start();
+
+ testPeer.ExpectBegin();
+
+ var session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+ var queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: new
Amqp.Message { BodySection = new AmqpValue { Value = "hello" } });
+ testPeer.ExpectDisposition(settled: true, state =>
+ {
+ Assert.AreEqual(state.Descriptor.Code,
MessageSupport.REJECTED_INSTANCE.Descriptor.Code);
+ });
+
+ var consumer = session.CreateConsumer(queue);
+
+ var m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+ Assert.NotNull(m, "Message should have been received");
+ Assert.IsInstanceOf<ITextMessage>(m);
+ session.Recover();
+
+ m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+
+ Assert.NotNull(m, "Message should have been received");
+ Assert.IsInstanceOf<ITextMessage>(m);
+ session.Recover();
+
+ // Verify the message is no longer there. Will drain to be
sure there are no messages.
+ Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should
not have been received");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestMessageReleasedWhenRedeliveryCountExceeded()
+ {
+ using (var testPeer = new TestAmqpPeer())
+ {
+ var connection = EstablishConnection(testPeer);
+ connection.RedeliveryPolicy = new CustomRedeliveryPolicy {
MaximumRedeliveries = 1, Outcome = 2 };
+ connection.Start();
+
+ testPeer.ExpectBegin();
+
+ var session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+ var queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: new
Amqp.Message { BodySection = new AmqpValue { Value = "hello" } });
+ testPeer.ExpectDisposition(settled: true, state =>
+ {
+ Assert.AreEqual(state.Descriptor.Code,
MessageSupport.RELEASED_INSTANCE.Descriptor.Code);
+ });
+
+ var consumer = session.CreateConsumer(queue);
+
+ var m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+ Assert.NotNull(m, "Message should have been received");
+ Assert.IsInstanceOf<ITextMessage>(m);
+ session.Recover();
+
+ m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+
+ Assert.NotNull(m, "Message should have been received");
+ Assert.IsInstanceOf<ITextMessage>(m);
+ session.Recover();
+
+ // Verify the message is no longer there. Will drain to be
sure there are no messages.
+ Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should
not have been received");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestMessageModifiedFailedWhenRedeliveryCountExceeded()
+ {
+ using (var testPeer = new TestAmqpPeer())
+ {
+ var connection = EstablishConnection(testPeer);
+ connection.RedeliveryPolicy = new CustomRedeliveryPolicy {
MaximumRedeliveries = 1, Outcome = 3 };
+ connection.Start();
+
+ testPeer.ExpectBegin();
+
+ var session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+ var queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: new
Amqp.Message { BodySection = new AmqpValue { Value = "hello" } });
+ testPeer.ExpectDisposition(settled: true, state =>
+ {
+ Assert.AreEqual(state.Descriptor.Code,
MessageSupport.MODIFIED_INSTANCE.Descriptor.Code);
+ });
+
+ var consumer = session.CreateConsumer(queue);
+
+ var m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+ Assert.NotNull(m, "Message should have been received");
+ Assert.IsInstanceOf<ITextMessage>(m);
+ session.Recover();
+
+ m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+
+ Assert.NotNull(m, "Message should have been received");
+ Assert.IsInstanceOf<ITextMessage>(m);
+ session.Recover();
+
+ // Verify the message is no longer there. Will drain to be
sure there are no messages.
+ Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should
not have been received");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void
TestMessageModifiedFailedUndeliverableWhenRedeliveryCountExceeded()
+ {
+ using (var testPeer = new TestAmqpPeer())
+ {
+ var connection = EstablishConnection(testPeer);
+ connection.RedeliveryPolicy = new CustomRedeliveryPolicy {
MaximumRedeliveries = 1, Outcome = 4 };
+ connection.Start();
+
+ testPeer.ExpectBegin();
+
+ var session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+ var queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: new
Amqp.Message { BodySection = new AmqpValue { Value = "hello" } });
+ testPeer.ExpectDisposition(settled: true, state =>
+ {
+ Assert.AreEqual(state.Descriptor.Code,
MessageSupport.MODIFIED_INSTANCE.Descriptor.Code);
+ Assert.IsTrue(state is Modified modified &&
modified.DeliveryFailed && modified.UndeliverableHere);
+ });
+
+ var consumer = session.CreateConsumer(queue);
+
+ var m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+ Assert.NotNull(m, "Message should have been received");
+ Assert.IsInstanceOf<ITextMessage>(m);
+ session.Recover();
+
+ m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+
+ Assert.NotNull(m, "Message should have been received");
+ Assert.IsInstanceOf<ITextMessage>(m);
+ session.Recover();
+
+ // Verify the message is no longer there. Will drain to be
sure there are no messages.
+ Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should
not have been received");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+ }
+
+ private class CustomRedeliveryPolicy :
Apache.NMS.Policies.RedeliveryPolicy
+ {
+ public int Outcome { get; set; }
+
+ public override int GetOutcome(IDestination destination)
+ {
+ return Outcome;
+ }
+ }
}
}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index e61cc88..d59329e 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -726,6 +726,13 @@ namespace NMS.AMQP.Test.TestAmqp
ExpectDisposition(settled: true, stateMatcher: stateMatcher);
}
+
+ public void ExpectDispositionThatIsModifiedFailedAndSettled()
+ {
+ Action<DeliveryState> stateMatcher = state => {
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.MODIFIED_FAILED_INSTANCE.Descriptor.Code); };
+
+ ExpectDisposition(settled: true, stateMatcher: stateMatcher);
+ }
public void ExpectDisposition(bool settled, Action<DeliveryState>
stateMatcher, uint? firstDeliveryId = null, uint? lastDeliveryId = null)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact