This is an automated email from the ASF dual-hosted git repository.
popduke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git
The following commit(s) were added to refs/heads/main by this push:
new f29670378 1. on back pressure rejected inbox commits, directly
schedule a delayed retry. (#241)
f29670378 is described below
commit f29670378cb95bbea2ccb126bf68d9eebdbec410
Author: Gu Jiawei <[email protected]>
AuthorDate: Mon Apr 6 20:21:34 2026 -0700
1. on back pressure rejected inbox commits, directly schedule a delayed
retry. (#241)
Co-authored-by: Yonny(Yu) Hao <[email protected]>
---
.../mqtt/handler/MQTTPersistentSessionHandler.java | 14 +++----------
.../mqtt/handler/v3/MQTTPersistentS2CPubTest.java | 24 ++++++++++++++++++++++
2 files changed, 27 insertions(+), 11 deletions(-)
diff --git
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTPersistentSessionHandler.java
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTPersistentSessionHandler.java
index 01e715828..a21f59922 100644
---
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTPersistentSessionHandler.java
+++
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTPersistentSessionHandler.java
@@ -406,14 +406,6 @@ public abstract class MQTTPersistentSessionHandler extends
MQTTSessionHandler im
}
}
- private void scheduleConfirmTimeout(long upToSeq) {
- confirmTimeout = ctx.executor().schedule(() -> {
- if (upToSeq < inboxConfirmedUpToSeq) {
- confirmSendBuffer();
- }
- }, ThreadLocalRandom.current().nextLong(15, 45), TimeUnit.SECONDS);
- }
-
private void confirmQoS0() {
if (qos0Confirming) {
return;
@@ -503,9 +495,9 @@ public abstract class MQTTPersistentSessionHandler extends
MQTTSessionHandler im
handleProtocolResponse(helper().onInboxTransientError(v.getCode().name()));
case BACK_PRESSURE_REJECTED -> {
inboxConfirming = false;
- if (upToSeq < inboxConfirmedUpToSeq) {
- scheduleConfirmTimeout(upToSeq);
- }
+ // schedule confirm later
+ confirmTimeout = ctx.executor()
+ .schedule(this::confirmSendBuffer,
ThreadLocalRandom.current().nextLong(15, 45), TimeUnit.SECONDS);
}
case TRY_LATER -> {
// try again with same version
diff --git
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTPersistentS2CPubTest.java
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTPersistentS2CPubTest.java
index cfa2ea407..86779559d 100644
---
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTPersistentS2CPubTest.java
+++
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTPersistentS2CPubTest.java
@@ -245,6 +245,30 @@ public class MQTTPersistentS2CPubTest extends BaseMQTTTest
{
verify(inboxClient, times(messageCount)).unsub(any());
}
+ @Test
+ public void retryCommitAfterBackPressure() {
+ mockAuthCheck(true);
+ when(inboxClient.commit(any()))
+ .thenReturn(CompletableFuture.completedFuture(
+
CommitReply.newBuilder().setCode(CommitReply.Code.BACK_PRESSURE_REJECTED).build()))
+ .thenReturn(CompletableFuture.completedFuture(
+
CommitReply.newBuilder().setCode(CommitReply.Code.OK).build()));
+
+ inboxFetchConsumer.accept(fetch(1, 128, AT_LEAST_ONCE));
+ channel.runPendingTasks();
+
+ MqttPublishMessage message = channel.readOutbound();
+ assertEquals(message.fixedHeader().qosLevel().value(),
QoS.AT_LEAST_ONCE_VALUE);
+
channel.writeInbound(MQTTMessageUtils.pubAckMessage(message.variableHeader().packetId()));
+ channel.runPendingTasks();
+
+ channel.advanceTimeBy(45, TimeUnit.SECONDS);
+ channel.runScheduledPendingTasks();
+ channel.runPendingTasks();
+
+ verify(inboxClient,
times(2)).commit(argThat(CommitRequest::hasSendBufferUpToSeq));
+ }
+
@Test
public void fetchTryLater() {
inboxFetchConsumer.accept(Fetched.newBuilder().setResult(Result.TRY_LATER).build());