This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5e509dc364e KAFKA-20287 : Fix CF handle leaks (#21751)
5e509dc364e is described below

commit 5e509dc364ea91a48c46cad065b4aecbcd911e4b
Author: Murali Basani <[email protected]>
AuthorDate: Fri Mar 27 23:35:02 2026 +0100

    KAFKA-20287 : Fix CF handle leaks (#21751)
    
    - RocksDBStore.java — openRocksDB() base method: Added finally block
    after RocksDB.open() to close all CF handles and db if
    createColumnFamilies() or mergeColumnFamilyHandleLists() fails.
    
    - RocksDBStore.java — openDB(): Added finally block with
    closeNativeResources() to close all partially-initialized native
    resources  if openRocksDB() or cfAccessor.open() fails.
    
    - RocksDBTimestampedStore.java: Changed RocksIterator from manual
    close() to try-with-resources, and added finally block to close all CF
    handles if an exception occurs before the accessor takes ownership.
    
    - RocksDBMigratingSessionStoreWithHeaders.java: Same as above —
    try-with-resources for RocksIterator and finally block for CF handle
    cleanup on failure.
    
    - RocksDBTimestampedStoreWithHeaders.java — openFromDefaultStore():
    Added finally block to close all CF handles if an exception occurs
    before the accessor takes ownership.
    
    - RocksDBTimestampedStoreWithHeaders.java — openFromTimestampedStore():
    Replaced manual per-handle close() calls (which missed
    columnFamilies.get(3)) with a single finally block that loops over all
    handles on failure.
    
    Reviewers: Matthias J. Sax <[email protected]>, Alieh Saeedi
     <[email protected]>
---
 .../RocksDBMigratingSessionStoreWithHeaders.java   |  37 ++++---
 .../RocksDBMigratingWindowStoreWithHeaders.java    |   7 +-
 .../streams/state/internals/RocksDBStore.java      | 107 +++++++++++++++++----
 .../state/internals/RocksDBTimestampedStore.java   |  38 +++++---
 .../RocksDBTimestampedStoreWithHeaders.java        |  91 ++++++++++--------
 5 files changed, 187 insertions(+), 93 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
index a54a846cb45..b1e081abd03 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
@@ -67,24 +67,29 @@ public class RocksDBMigratingSessionStoreWithHeaders 
extends RocksDBStore implem
         final ColumnFamilyHandle withHeadersColumnFamily = 
columnFamilies.get(1);
         final ColumnFamilyHandle offsetsCf = columnFamilies.get(2);
 
-        final RocksIterator noHeadersIter = 
db.newIterator(noHeadersColumnFamily);
-        noHeadersIter.seekToFirst();
-        if (noHeadersIter.isValid()) {
-            log.info("Opening store {} in upgrade mode", name);
-            cfAccessor = new DualColumnFamilyAccessor(
-                offsetsCf,
-                noHeadersColumnFamily,
-                withHeadersColumnFamily,
-                HeadersBytesStore::convertToHeaderFormat,
-                this,
+        try (final RocksIterator noHeadersIter = 
db.newIterator(noHeadersColumnFamily)) {
+            noHeadersIter.seekToFirst();
+            if (noHeadersIter.isValid()) {
+                log.info("Opening store {} in upgrade mode", name);
+                cfAccessor = new DualColumnFamilyAccessor(
+                    offsetsCf,
+                    noHeadersColumnFamily,
+                    withHeadersColumnFamily,
+                    HeadersBytesStore::convertToHeaderFormat,
+                    this,
                     open
-            );
-        } else {
-            log.info("Opening store {} in regular mode", name);
-            cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, 
withHeadersColumnFamily);
-            noHeadersColumnFamily.close();
+                );
+            } else {
+                log.info("Opening store {} in regular mode", name);
+                cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, 
withHeadersColumnFamily);
+                noHeadersColumnFamily.close();
+            }
+        } catch (final RuntimeException e) {
+            for (final ColumnFamilyHandle handle : columnFamilies) {
+                handle.close();
+            }
+            throw e;
         }
-        noHeadersIter.close();
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
index e0a25e6bbc7..0ca73a25676 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
@@ -77,8 +77,6 @@ public class RocksDBMigratingWindowStoreWithHeaders extends 
RocksDBStore impleme
             noHeadersIter.seekToFirst();
             if (noHeadersIter.isValid()) {
                 log.info("Opening window store {} in upgrade mode from plain 
value format", name);
-                // Migrate from [value] to [headers][value]
-                // Add empty headers prefix [0x00] to plain value
                 cfAccessor = new DualColumnFamilyAccessor(
                     offsetsCf,
                     noHeadersColumnFamily,
@@ -92,6 +90,11 @@ public class RocksDBMigratingWindowStoreWithHeaders extends 
RocksDBStore impleme
                 cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, 
withHeadersColumnFamily);
                 noHeadersColumnFamily.close();
             }
+        } catch (final RuntimeException e) {
+            for (final ColumnFamilyHandle handle : columnFamilies) {
+                handle.close();
+            }
+            throw e;
         }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 8de21f4d40f..783cae5b8c2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -245,21 +245,26 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
         // Setup statistics before the database is opened, otherwise the 
statistics are not updated
         // with the measurements from Rocks DB
         setupStatistics(configs, dbOptions);
-        openRocksDB(dbOptions, columnFamilyOptions);
-        dbAccessor = new DirectDBAccessor(db, fOptions, wOptions);
         try {
-            final Position existingPositionOrEmpty = 
cfAccessor.open(dbAccessor, !eosEnabled);
-            if (position == null) {
-                position = existingPositionOrEmpty;
-            } else {
-                // For segmented stores, the overall position is composed of 
multiple underlying stores, so merge this store's position into it.
-                position.merge(existingPositionOrEmpty);
+            openRocksDB(dbOptions, columnFamilyOptions);
+            dbAccessor = new DirectDBAccessor(db, fOptions, wOptions);
+            try {
+                final Position existingPositionOrEmpty = 
cfAccessor.open(dbAccessor, !eosEnabled);
+                if (position == null) {
+                    position = existingPositionOrEmpty;
+                } else {
+                    // For segmented stores, the overall position is composed 
of multiple underlying stores, so merge this store's position into it.
+                    position.merge(existingPositionOrEmpty);
+                }
+            } catch (final StreamsException fatal) {
+                final String fatalMessage = "State store " + name + " didn't 
find a valid state, since under EOS it has the risk of getting uncommitted data 
in stores";
+                throw new ProcessorStateException(fatalMessage, fatal);
+            } catch (final RocksDBException e) {
+                throw new ProcessorStateException("Error opening store " + 
name, e);
             }
-        } catch (final StreamsException fatal) {
-            final String fatalMessage = "State store " + name + " didn't find 
a valid state, since under EOS it has the risk of getting uncommitted data in 
stores";
-            throw new ProcessorStateException(fatalMessage, fatal);
-        } catch (final RocksDBException e) {
-            throw new ProcessorStateException("Error opening store " + name, 
e);
+        } catch (final RuntimeException e) {
+            closeNativeResources();
+            throw e;
         }
 
         addValueProvidersToMetricsRecorder();
@@ -361,10 +366,20 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
                     .filter(descriptor -> 
allExisting.stream().noneMatch(existing -> Arrays.equals(existing, 
descriptor.getName())))
                     .collect(Collectors.toList());
             final List<ColumnFamilyHandle> existingColumnFamilies = new 
ArrayList<>(existingDescriptors.size());
-            db = RocksDB.open(dbOptions, absolutePath, existingDescriptors, 
existingColumnFamilies);
-            final List<ColumnFamilyHandle> createdColumnFamilies = 
db.createColumnFamilies(toCreate);
-
-            return mergeColumnFamilyHandleLists(existingColumnFamilies, 
createdColumnFamilies, allDescriptors);
+            final List<ColumnFamilyHandle> createdColumnFamilies = new 
ArrayList<>();
+            try {
+                db = RocksDB.open(dbOptions, absolutePath, 
existingDescriptors, existingColumnFamilies);
+                
createdColumnFamilies.addAll(db.createColumnFamilies(toCreate));
+                return mergeColumnFamilyHandleLists(existingColumnFamilies, 
createdColumnFamilies, allDescriptors);
+            } catch (final Exception e) {
+                for (final ColumnFamilyHandle handle : existingColumnFamilies) 
{
+                    handle.close();
+                }
+                for (final ColumnFamilyHandle handle : createdColumnFamilies) {
+                    handle.close();
+                }
+                throw e;
+            }
 
         } catch (final RocksDBException e) {
             throw new ProcessorStateException("Error opening store " + name + 
" at location " + dbDir.toString(), e);
@@ -787,6 +802,64 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
         statistics = null;
     }
 
+    /**
+     * Close all native RocksDB resources with null-safety.
+     * Used only by the error cleanup path in {@link #openDB} where some 
resources
+     * may not have been initialized yet.
+     */
+    private void closeNativeResources() {
+        closeDbAndAccessors();
+        closeOptionsAndFilters();
+    }
+
+    private void closeDbAndAccessors() {
+        if (cfAccessor != null) {
+            try {
+                if (dbAccessor != null) {
+                    cfAccessor.close(dbAccessor);
+                }
+            } catch (final Exception e) {
+                log.error("Error while closing column family handles for store 
" + name, e);
+            }
+            cfAccessor = null;
+        }
+        if (dbAccessor != null) {
+            dbAccessor.close();
+            dbAccessor = null;
+        }
+        if (db != null) {
+            db.close();
+            db = null;
+        }
+    }
+
+    private void closeOptionsAndFilters() {
+        if (userSpecifiedOptions != null) {
+            userSpecifiedOptions.close();
+            userSpecifiedOptions = null;
+        }
+        if (wOptions != null) {
+            wOptions.close();
+            wOptions = null;
+        }
+        if (fOptions != null) {
+            fOptions.close();
+            fOptions = null;
+        }
+        if (filter != null) {
+            filter.close();
+            filter = null;
+        }
+        if (cache != null) {
+            cache.close();
+            cache = null;
+        }
+        if (statistics != null) {
+            statistics.close();
+            statistics = null;
+        }
+    }
+
     private void closeOpenIterators() {
         final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
         synchronized (openIterators) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
index b7f8fd64f3b..2a37e05fb26 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
@@ -63,23 +63,29 @@ public class RocksDBTimestampedStore extends RocksDBStore 
implements Timestamped
         final ColumnFamilyHandle withTimestampColumnFamily = 
columnFamilies.get(1);
         final ColumnFamilyHandle offsetsColumnFamily = columnFamilies.get(2);
 
-        final RocksIterator noTimestampsIter = 
db.newIterator(noTimestampColumnFamily);
-        noTimestampsIter.seekToFirst();
-        if (noTimestampsIter.isValid()) {
-            log.info("Opening store {} in upgrade mode", name);
-            cfAccessor = new DualColumnFamilyAccessor(
-                offsetsColumnFamily,
-                noTimestampColumnFamily,
-                withTimestampColumnFamily,
-                TimestampedBytesStore::convertToTimestampedFormat,
-                this, open
-            );
-        } else {
-            log.info("Opening store {} in regular mode", name);
-            cfAccessor = new SingleColumnFamilyAccessor(offsetsColumnFamily, 
withTimestampColumnFamily);
-            noTimestampColumnFamily.close();
+        try (final RocksIterator noTimestampsIter = 
db.newIterator(noTimestampColumnFamily)) {
+            noTimestampsIter.seekToFirst();
+            if (noTimestampsIter.isValid()) {
+                log.info("Opening store {} in upgrade mode", name);
+                cfAccessor = new DualColumnFamilyAccessor(
+                    offsetsColumnFamily,
+                    noTimestampColumnFamily,
+                    withTimestampColumnFamily,
+                    TimestampedBytesStore::convertToTimestampedFormat,
+                    this,
+                    open
+                );
+            } else {
+                log.info("Opening store {} in regular mode", name);
+                cfAccessor = new 
SingleColumnFamilyAccessor(offsetsColumnFamily, withTimestampColumnFamily);
+                noTimestampColumnFamily.close();
+            }
+        } catch (final RuntimeException e) {
+            for (final ColumnFamilyHandle handle : columnFamilies) {
+                handle.close();
+            }
+            throw e;
         }
-        noTimestampsIter.close();
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
index ea8ff9bc917..50cbecfd293 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
@@ -116,13 +116,18 @@ public class RocksDBTimestampedStoreWithHeaders extends 
RocksDBStore implements
                     headersCf,
                     HeadersBytesStore::convertFromPlainToHeaderFormat,
                     this,
-                        open
+                    open
                 );
             } else {
                 log.info("Opening store {} in regular headers-aware mode", 
name);
                 cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, 
headersCf);
                 defaultCf.close();
             }
+        } catch (final RuntimeException e) {
+            for (final ColumnFamilyHandle handle : columnFamilies) {
+                handle.close();
+            }
+            throw e;
         }
     }
 
@@ -137,53 +142,55 @@ public class RocksDBTimestampedStoreWithHeaders extends 
RocksDBStore implements
             new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, 
columnFamilyOptions)
         );
 
-        // verify and close empty Default ColumnFamily
-        try (final RocksIterator defaultIter = 
db.newIterator(columnFamilies.get(0))) {
-            defaultIter.seekToFirst();
-            if (defaultIter.isValid()) {
-                // Close all column family handles before throwing
-                columnFamilies.get(0).close();
-                columnFamilies.get(1).close();
-                columnFamilies.get(2).close();
-                throw new ProcessorStateException(
-                    "Inconsistent store state for " + name + ". " +
-                        "Cannot have both plain (DEFAULT) and timestamped data 
simultaneously. " +
-                        "Headers store can upgrade from either plain or 
timestamped format, but not both."
-                );
+        try {
+            // verify and close empty Default ColumnFamily
+            try (final RocksIterator defaultIter = 
db.newIterator(columnFamilies.get(0))) {
+                defaultIter.seekToFirst();
+                if (defaultIter.isValid()) {
+                    throw new ProcessorStateException(
+                        "Inconsistent store state for " + name + ". " +
+                            "Cannot have both plain (DEFAULT) and timestamped 
data simultaneously. " +
+                            "Headers store can upgrade from either plain or 
timestamped format, but not both."
+                    );
+                }
             }
             // close default column family handle
             columnFamilies.get(0).close();
-        }
-
-        final ColumnFamilyHandle legacyTimestampedCf = columnFamilies.get(1);
-        final ColumnFamilyHandle headersCf = columnFamilies.get(2);
-        final ColumnFamilyHandle offsetsCf = columnFamilies.get(3);
 
-
-        // Check if legacy timestamped CF has data
-        try (final RocksIterator legacyIter = 
db.newIterator(legacyTimestampedCf)) {
-            legacyIter.seekToFirst();
-            if (legacyIter.isValid()) {
-                log.info("Opening store {} in upgrade mode from timestamped 
store", name);
-                cfAccessor = new DualColumnFamilyAccessor(
-                    offsetsCf,
-                    legacyTimestampedCf,
-                    headersCf,
-                    HeadersBytesStore::convertToHeaderFormat,
-                    this,
-                        open
-                );
-            } else {
-                log.info("Opening store {} in regular headers-aware mode", 
name);
-                cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, 
headersCf);
-                try {
-                    db.dropColumnFamily(legacyTimestampedCf);
-                } catch (final RocksDBException e) {
-                    throw new RuntimeException(e);
-                } finally {
-                    legacyTimestampedCf.close();
+            final ColumnFamilyHandle legacyTimestampedCf = 
columnFamilies.get(1);
+            final ColumnFamilyHandle headersCf = columnFamilies.get(2);
+            final ColumnFamilyHandle offsetsCf = columnFamilies.get(3);
+
+            // Check if legacy timestamped CF has data
+            try (final RocksIterator legacyIter = 
db.newIterator(legacyTimestampedCf)) {
+                legacyIter.seekToFirst();
+                if (legacyIter.isValid()) {
+                    log.info("Opening store {} in upgrade mode from 
timestamped store", name);
+                    cfAccessor = new DualColumnFamilyAccessor(
+                        offsetsCf,
+                        legacyTimestampedCf,
+                        headersCf,
+                        HeadersBytesStore::convertToHeaderFormat,
+                        this,
+                            open
+                    );
+                } else {
+                    log.info("Opening store {} in regular headers-aware mode", 
name);
+                    cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, 
headersCf);
+                    try {
+                        db.dropColumnFamily(legacyTimestampedCf);
+                    } catch (final RocksDBException e) {
+                        throw new RuntimeException(e);
+                    } finally {
+                        legacyTimestampedCf.close();
+                    }
                 }
             }
+        } catch (final RuntimeException e) {
+            for (final ColumnFamilyHandle handle : columnFamilies) {
+                handle.close();
+            }
+            throw e;
         }
     }
 

Reply via email to