This is an automated email from the ASF dual-hosted git repository.

popduke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git


The following commit(s) were added to refs/heads/main by this push:
     new f29670378 1. on back pressure rejected inbox commits, directly 
schedule a delayed retry. (#241)
f29670378 is described below

commit f29670378cb95bbea2ccb126bf68d9eebdbec410
Author: Gu Jiawei <[email protected]>
AuthorDate: Mon Apr 6 20:21:34 2026 -0700

    1. on back pressure rejected inbox commits, directly schedule a delayed 
retry. (#241)
    
    Co-authored-by: Yonny(Yu) Hao <[email protected]>
---
 .../mqtt/handler/MQTTPersistentSessionHandler.java | 14 +++----------
 .../mqtt/handler/v3/MQTTPersistentS2CPubTest.java  | 24 ++++++++++++++++++++++
 2 files changed, 27 insertions(+), 11 deletions(-)

diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTPersistentSessionHandler.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTPersistentSessionHandler.java
index 01e715828..a21f59922 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTPersistentSessionHandler.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTPersistentSessionHandler.java
@@ -406,14 +406,6 @@ public abstract class MQTTPersistentSessionHandler extends 
MQTTSessionHandler im
         }
     }
 
-    private void scheduleConfirmTimeout(long upToSeq) {
-        confirmTimeout = ctx.executor().schedule(() -> {
-            if (upToSeq < inboxConfirmedUpToSeq) {
-                confirmSendBuffer();
-            }
-        }, ThreadLocalRandom.current().nextLong(15, 45), TimeUnit.SECONDS);
-    }
-
     private void confirmQoS0() {
         if (qos0Confirming) {
             return;
@@ -503,9 +495,9 @@ public abstract class MQTTPersistentSessionHandler extends 
MQTTSessionHandler im
                         
handleProtocolResponse(helper().onInboxTransientError(v.getCode().name()));
                     case BACK_PRESSURE_REJECTED -> {
                         inboxConfirming = false;
-                        if (upToSeq < inboxConfirmedUpToSeq) {
-                            scheduleConfirmTimeout(upToSeq);
-                        }
+                        // schedule confirm later
+                        confirmTimeout = ctx.executor()
+                            .schedule(this::confirmSendBuffer, 
ThreadLocalRandom.current().nextLong(15, 45), TimeUnit.SECONDS);
                     }
                     case TRY_LATER -> {
                         // try again with same version
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTPersistentS2CPubTest.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTPersistentS2CPubTest.java
index cfa2ea407..86779559d 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTPersistentS2CPubTest.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTPersistentS2CPubTest.java
@@ -245,6 +245,30 @@ public class MQTTPersistentS2CPubTest extends BaseMQTTTest 
{
         verify(inboxClient, times(messageCount)).unsub(any());
     }
 
+    @Test
+    public void retryCommitAfterBackPressure() {
+        mockAuthCheck(true);
+        when(inboxClient.commit(any()))
+            .thenReturn(CompletableFuture.completedFuture(
+                
CommitReply.newBuilder().setCode(CommitReply.Code.BACK_PRESSURE_REJECTED).build()))
+            .thenReturn(CompletableFuture.completedFuture(
+                
CommitReply.newBuilder().setCode(CommitReply.Code.OK).build()));
+
+        inboxFetchConsumer.accept(fetch(1, 128, AT_LEAST_ONCE));
+        channel.runPendingTasks();
+
+        MqttPublishMessage message = channel.readOutbound();
+        assertEquals(message.fixedHeader().qosLevel().value(), 
QoS.AT_LEAST_ONCE_VALUE);
+        
channel.writeInbound(MQTTMessageUtils.pubAckMessage(message.variableHeader().packetId()));
+        channel.runPendingTasks();
+
+        channel.advanceTimeBy(45, TimeUnit.SECONDS);
+        channel.runScheduledPendingTasks();
+        channel.runPendingTasks();
+
+        verify(inboxClient, 
times(2)).commit(argThat(CommitRequest::hasSendBufferUpToSeq));
+    }
+
     @Test
     public void fetchTryLater() {
         
inboxFetchConsumer.accept(Fetched.newBuilder().setResult(Result.TRY_LATER).build());

Reply via email to