This is an automated email from the ASF dual-hosted git repository.

apoorvmittal10 pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 834b7ac10b7 KAFKA-20505: Moving future completion out of locks for 
Share Partition (#22130)
834b7ac10b7 is described below

commit 834b7ac10b7e3d3bde45e4e0431cc7566de7cafb
Author: Apoorv Mittal <[email protected]>
AuthorDate: Thu Apr 23 17:28:25 2026 +0100

    KAFKA-20505: Moving future completion out of locks for Share Partition 
(#22130)
    
    As explianed in KAFKA-20505, there can be a deadlock when future is
    completed for the request where next set of actions tries to attain lock
    on purgatory (checkAndComplete/trigger waiting requests). As the lock
    might not always be released hence a deadlock can happen. The PR moves
    such futures out of the lock.
    
    I have also reviewed other future completions and doesn't seems we need
    other changes.
    
    I have tested using franz-go Kafka test and can't reproduce the issues
    in 160 continuous runs. Earlier the issue was reproducible between 20-50
    consecutive runs.
    
    ```
    === Run 160 ===
    === RUN   TestShareGroupETL
    === PAUSE TestShareGroupETL
    === CONT  TestShareGroupETL
    [09:59:38.788 1][INFO] producing to a new topic for the first time,
    fetching metadata to learn its partitions; topic:
    f2c35a17978f41d684a36ab8297dd873138cb3c982f095a7d0e74d7a8589411e
    ...
    ...
    [09:59:43.94 3][INFO] immediate metadata update triggered; why: forced
    load because we are producing to a topic for the first time
    [09:59:43.947 3][INFO] done waiting for metadata for new topic; topic:
    6ce53f956e91c6b106843ff7aa728451f290cbd9064cefc8dc73ee2bcadef7b9
        share_test.go:507: level 1 phase 2: adding consumers after 122923
    consumed
    [09:59:44.225 3][INFO] flushing
    [09:59:44.225 4][INFO] immediate metadata update triggered; why:
    querying metadata for consumer initialization
    ...
    ...
    [09:59:44.226 5][INFO] beginning to manage the share group lifecycle;
    group: 0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35
    [09:59:44.226 10][INFO] beginning to manage the share group lifecycle;
    group: 0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35
    [09:59:44.227 3][INFO] leaving share group; group:
    0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35,
    member_id: LP4lpqzQjAm-QxQdCRkSXA==
    [09:59:44.227 7][INFO] assigning share partitions; group:
    0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35,
    assignments: map[]
    [09:59:44.227 4][INFO] assigning share partitions; group:
    0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35,
    assignments: map[]
    ...
    ...
    [09:59:49.232 7][INFO] assigning share partitions; group:
    0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35,
    assignments:
    map[f2c35a17978f41d684a36ab8297dd873138cb3c982f095a7d0e74d7a8589411e:[1
    2]]
    [09:59:49.232 6][INFO] assigning share partitions; group:
    0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35,
    assignments:
    map[f2c35a17978f41d684a36ab8297dd873138cb3c982f095a7d0e74d7a8589411e:[1]]
    ...
    ...
    [09:59:49.466 18][INFO] immediate metadata update triggered; why:
    querying metadata for consumer initialization
    [09:59:49.466 13][INFO] immediate metadata update triggered; why:
    querying metadata for consumer initialization
    [09:59:49.466 15][INFO] immediate metadata update triggered; why:
    querying metadata for consumer initialization
    [09:59:49.467 16][INFO] beginning to manage the share group lifecycle;
    group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6
    [09:59:49.467 14][INFO] beginning to manage the share group lifecycle;
    group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6
    [09:59:49.467 11][INFO] beginning to manage the share group lifecycle;
    group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6
    ...
    ...
    [09:59:49.467 15][INFO] beginning to manage the share group lifecycle;
    group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6
    [09:59:49.468 13][INFO] assigning share partitions; group:
    0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35,
    assignments: map[]
    [09:59:49.469 17][INFO] assigning share partitions; group:
    d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6,
    assignments: map[]
    ...
    ...
    [09:59:54.472 18][INFO] assigning share partitions; group:
    d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6,
    assignments:
    map[5da63960cb4fe97d887d575a73dfbddc51a2eb8071d119b3a5ba5a2b0d87bc7e:[1]]
    [09:59:54.485 14][INFO] assigning share partitions; group:
    d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6,
    assignments:
    map[6ce53f956e91c6b106843ff7aa728451f290cbd9064cefc8dc73ee2bcadef7b9:[0]]
    ...
    ...
    [09:59:54.494 12][INFO] metadata update triggered; why: reload trigger
    due to produce topic still not known
    [09:59:54.495 12][INFO] producer id initialization success; id: 3524,
    epoch: 0
    [09:59:54.5 13][INFO] producing to a new topic for the first time,
    fetching metadata to learn its partitions; topic:
    6ce53f956e91c6b106843ff7aa728451f290cbd9064cefc8dc73ee2bcadef7b9
    [09:59:54.5 13][INFO] immediate metadata update triggered; why: forced
    load because we are producing to a topic for the first time
    ...
    ...
    [09:59:54.525 11][INFO] leaving share group; group:
    d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6,
    member_id: _4Jk9faoDdUlGMlSR9zKmg==
        share_test.go:605: level 2 rebalance 1: killing l2-c1 after 169339
    consumed
    [09:59:55.101 14][INFO] flushing
    [09:59:55.101 19][INFO] immediate metadata update triggered; why:
    querying metadata for consumer initialization
    [09:59:55.102 19][INFO] beginning to manage the share group lifecycle;
    group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6
    [09:59:55.103 14][INFO] leaving share group; group:
    d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6,
    member_id: wJibgxG934tiAuqCPloF_w==
    [09:59:55.107 19][INFO] assigning share partitions; group:
    d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6,
    assignments: map[]
        share_test.go:619: level 2 rebalance 2: killing l2-c3 after 375726
    consumed
    [09:59:55.401 18][INFO] flushing
    ...
    ...
    [10:00:00.915 20][INFO] leaving share group; group:
    d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6,
    member_id: HjzG4-5QwncfYfr8pSmEUQ==
        share_test.go:377: level 1: 499900 unique keys, 500624 total
    accepts, 500624 produced, 724 duplicates, 35614 redelivered, max dc 3,
    consumed 532987
        share_test.go:377: level 2: 499900 unique keys, 501513 total
    accepts, 501513 produced, 1613 duplicates, 20272 redelivered, max dc 2,
    consumed 518049
        share_test.go:704: level 1: 100 purely rejected, 35614 redelivered
        share_test.go:60: deleting topic
    f2c35a17978f41d684a36ab8297dd873138cb3c982f095a7d0e74d7a8589411e
        share_test.go:61: deleting topic
    f7e388a2de7ef0814328f9186e8c4b73b1f2437490e1b98730af9fb17ee74175
        share_test.go:62: deleting topic
    5da63960cb4fe97d887d575a73dfbddc51a2eb8071d119b3a5ba5a2b0d87bc7e
        share_test.go:63: deleting topic
    6ce53f956e91c6b106843ff7aa728451f290cbd9064cefc8dc73ee2bcadef7b9
        share_test.go:64: deleting topic
    7e74eb054cbb02e0de5da8a8018115dc01094496222039f323841770b11b8a12
        share_test.go:65: deleting topic
    4b8c44d4071cd22272ae9ac694342faa3404bd10b479fe88874bdef4a8a4276d
    --- PASS: TestShareGroupETL (22.73s)
    PASS
    ok      github.com/twmb/franz-go/pkg/kgo        22.926s
    ```
    
    Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai
     <[email protected]>, PoAn Yang <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    | 29 ++++++++++++----------
 1 file changed, 16 insertions(+), 13 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index dbd3c18f65e..c9b7186ebed 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -2450,9 +2450,9 @@ public class SharePartition {
         Throwable throwable,
         List<PersisterBatch> persisterBatches
     ) {
-        lock.writeLock().lock();
-        try {
-            if (throwable != null) {
+        if (throwable != null) {
+            lock.writeLock().lock();
+            try {
                 // Log in DEBUG to avoid flooding of logs for a faulty client.
                 log.debug("Request failed for updating state, rollback any 
changed state"
                     + " for the share partition: {}-{}", groupId, 
topicIdPartition);
@@ -2469,16 +2469,16 @@ public class SharePartition {
                         
deliveryCompleteCount.addAndGet(-numInFlightRecordsInBatch(persisterBatch.stateBatch.firstOffset(),
 persisterBatch.stateBatch.lastOffset()));
                     }
                 });
-                future.completeExceptionally(throwable);
-                return;
+            } finally {
+                lock.writeLock().unlock();
             }
+            future.completeExceptionally(throwable);
+            return;
+        }
 
-            if (persisterBatches.isEmpty()) {
-                future.complete(null);
-                return;
-            }
-        } finally {
-            lock.writeLock().unlock();
+        if (persisterBatches.isEmpty()) {
+            future.complete(null);
+            return;
         }
 
         
writeShareGroupState(persisterBatches.stream().map(PersisterBatch::stateBatch).toList())
@@ -2507,7 +2507,6 @@ public class SharePartition {
                                 
deliveryCompleteCount.addAndGet(-numInFlightRecordsInBatch(persisterBatch.stateBatch.firstOffset(),
 persisterBatch.stateBatch.lastOffset()));
                             }
                         });
-                        future.completeExceptionally(exception);
                         return;
                     }
 
@@ -2521,9 +2520,13 @@ public class SharePartition {
                     });
                     // Update the cached state and start and end offsets after 
acknowledging/releasing the acquired records.
                     cacheStateUpdated = maybeUpdateCachedStateAndOffsets();
-                    future.complete(null);
                 } finally {
                     lock.writeLock().unlock();
+                    if (exception != null) {
+                        future.completeExceptionally(exception);
+                    } else {
+                        future.complete(null);
+                    }
                     // Maybe complete the delayed share fetch request if the 
state has been changed in cache
                     // which might have moved start offset ahead. Hence, the 
pending delayed share fetch
                     // request can be completed. The call should be made 
outside the lock to avoid deadlock.

Reply via email to