This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 32530576221 KAFKA-20287 : Fix CF handle leaks (#21751)
32530576221 is described below
commit 3253057622136d14dcacf00ab325493fafdf1c4e
Author: Murali Basani <[email protected]>
AuthorDate: Fri Mar 27 23:35:02 2026 +0100
KAFKA-20287 : Fix CF handle leaks (#21751)
- RocksDBStore.java — openRocksDB() base method: Added finally block
after RocksDB.open() to close all CF handles and db if
createColumnFamilies() or mergeColumnFamilyHandleLists() fails.
- RocksDBStore.java — openDB(): Added finally block with
closeNativeResources() to close all partially-initialized native
resources if openRocksDB() or cfAccessor.open() fails.
- RocksDBTimestampedStore.java: Changed RocksIterator from manual
close() to try-with-resources, and added finally block to close all CF
handles if an exception occurs before the accessor takes ownership.
- RocksDBMigratingSessionStoreWithHeaders.java: Same as above —
try-with-resources for RocksIterator and finally block for CF handle
cleanup on failure.
- RocksDBTimestampedStoreWithHeaders.java — openFromDefaultStore():
Added finally block to close all CF handles if an exception occurs
before the accessor takes ownership.
- RocksDBTimestampedStoreWithHeaders.java — openFromTimestampedStore():
Replaced manual per-handle close() calls (which missed
columnFamilies.get(3)) with a single finally block that loops over all
handles on failure.
Reviewers: Matthias J. Sax <[email protected]>, Alieh Saeedi
<[email protected]>
---
.../RocksDBMigratingSessionStoreWithHeaders.java | 37 ++++---
.../RocksDBMigratingWindowStoreWithHeaders.java | 7 +-
.../streams/state/internals/RocksDBStore.java | 107 +++++++++++++++++----
.../state/internals/RocksDBTimestampedStore.java | 38 +++++---
.../RocksDBTimestampedStoreWithHeaders.java | 91 ++++++++++--------
5 files changed, 187 insertions(+), 93 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
index a54a846cb45..b1e081abd03 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
@@ -67,24 +67,29 @@ public class RocksDBMigratingSessionStoreWithHeaders
extends RocksDBStore implem
final ColumnFamilyHandle withHeadersColumnFamily =
columnFamilies.get(1);
final ColumnFamilyHandle offsetsCf = columnFamilies.get(2);
- final RocksIterator noHeadersIter =
db.newIterator(noHeadersColumnFamily);
- noHeadersIter.seekToFirst();
- if (noHeadersIter.isValid()) {
- log.info("Opening store {} in upgrade mode", name);
- cfAccessor = new DualColumnFamilyAccessor(
- offsetsCf,
- noHeadersColumnFamily,
- withHeadersColumnFamily,
- HeadersBytesStore::convertToHeaderFormat,
- this,
+ try (final RocksIterator noHeadersIter =
db.newIterator(noHeadersColumnFamily)) {
+ noHeadersIter.seekToFirst();
+ if (noHeadersIter.isValid()) {
+ log.info("Opening store {} in upgrade mode", name);
+ cfAccessor = new DualColumnFamilyAccessor(
+ offsetsCf,
+ noHeadersColumnFamily,
+ withHeadersColumnFamily,
+ HeadersBytesStore::convertToHeaderFormat,
+ this,
open
- );
- } else {
- log.info("Opening store {} in regular mode", name);
- cfAccessor = new SingleColumnFamilyAccessor(offsetsCf,
withHeadersColumnFamily);
- noHeadersColumnFamily.close();
+ );
+ } else {
+ log.info("Opening store {} in regular mode", name);
+ cfAccessor = new SingleColumnFamilyAccessor(offsetsCf,
withHeadersColumnFamily);
+ noHeadersColumnFamily.close();
+ }
+ } catch (final RuntimeException e) {
+ for (final ColumnFamilyHandle handle : columnFamilies) {
+ handle.close();
+ }
+ throw e;
}
- noHeadersIter.close();
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
index e0a25e6bbc7..0ca73a25676 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java
@@ -77,8 +77,6 @@ public class RocksDBMigratingWindowStoreWithHeaders extends
RocksDBStore impleme
noHeadersIter.seekToFirst();
if (noHeadersIter.isValid()) {
log.info("Opening window store {} in upgrade mode from plain
value format", name);
- // Migrate from [value] to [headers][value]
- // Add empty headers prefix [0x00] to plain value
cfAccessor = new DualColumnFamilyAccessor(
offsetsCf,
noHeadersColumnFamily,
@@ -92,6 +90,11 @@ public class RocksDBMigratingWindowStoreWithHeaders extends
RocksDBStore impleme
cfAccessor = new SingleColumnFamilyAccessor(offsetsCf,
withHeadersColumnFamily);
noHeadersColumnFamily.close();
}
+ } catch (final RuntimeException e) {
+ for (final ColumnFamilyHandle handle : columnFamilies) {
+ handle.close();
+ }
+ throw e;
}
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 8de21f4d40f..783cae5b8c2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -245,21 +245,26 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
// Setup statistics before the database is opened, otherwise the
statistics are not updated
// with the measurements from Rocks DB
setupStatistics(configs, dbOptions);
- openRocksDB(dbOptions, columnFamilyOptions);
- dbAccessor = new DirectDBAccessor(db, fOptions, wOptions);
try {
- final Position existingPositionOrEmpty =
cfAccessor.open(dbAccessor, !eosEnabled);
- if (position == null) {
- position = existingPositionOrEmpty;
- } else {
- // For segmented stores, the overall position is composed of
multiple underlying stores, so merge this store's position into it.
- position.merge(existingPositionOrEmpty);
+ openRocksDB(dbOptions, columnFamilyOptions);
+ dbAccessor = new DirectDBAccessor(db, fOptions, wOptions);
+ try {
+ final Position existingPositionOrEmpty =
cfAccessor.open(dbAccessor, !eosEnabled);
+ if (position == null) {
+ position = existingPositionOrEmpty;
+ } else {
+ // For segmented stores, the overall position is composed
of multiple underlying stores, so merge this store's position into it.
+ position.merge(existingPositionOrEmpty);
+ }
+ } catch (final StreamsException fatal) {
+ final String fatalMessage = "State store " + name + " didn't
find a valid state, since under EOS it has the risk of getting uncommitted data
in stores";
+ throw new ProcessorStateException(fatalMessage, fatal);
+ } catch (final RocksDBException e) {
+ throw new ProcessorStateException("Error opening store " +
name, e);
}
- } catch (final StreamsException fatal) {
- final String fatalMessage = "State store " + name + " didn't find
a valid state, since under EOS it has the risk of getting uncommitted data in
stores";
- throw new ProcessorStateException(fatalMessage, fatal);
- } catch (final RocksDBException e) {
- throw new ProcessorStateException("Error opening store " + name,
e);
+ } catch (final RuntimeException e) {
+ closeNativeResources();
+ throw e;
}
addValueProvidersToMetricsRecorder();
@@ -361,10 +366,20 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
.filter(descriptor ->
allExisting.stream().noneMatch(existing -> Arrays.equals(existing,
descriptor.getName())))
.collect(Collectors.toList());
final List<ColumnFamilyHandle> existingColumnFamilies = new
ArrayList<>(existingDescriptors.size());
- db = RocksDB.open(dbOptions, absolutePath, existingDescriptors,
existingColumnFamilies);
- final List<ColumnFamilyHandle> createdColumnFamilies =
db.createColumnFamilies(toCreate);
-
- return mergeColumnFamilyHandleLists(existingColumnFamilies,
createdColumnFamilies, allDescriptors);
+ final List<ColumnFamilyHandle> createdColumnFamilies = new
ArrayList<>();
+ try {
+ db = RocksDB.open(dbOptions, absolutePath,
existingDescriptors, existingColumnFamilies);
+
createdColumnFamilies.addAll(db.createColumnFamilies(toCreate));
+ return mergeColumnFamilyHandleLists(existingColumnFamilies,
createdColumnFamilies, allDescriptors);
+ } catch (final Exception e) {
+ for (final ColumnFamilyHandle handle : existingColumnFamilies)
{
+ handle.close();
+ }
+ for (final ColumnFamilyHandle handle : createdColumnFamilies) {
+ handle.close();
+ }
+ throw e;
+ }
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error opening store " + name +
" at location " + dbDir.toString(), e);
@@ -787,6 +802,64 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
statistics = null;
}
+ /**
+ * Close all native RocksDB resources with null-safety.
+ * Used only by the error cleanup path in {@link #openDB} where some
resources
+ * may not have been initialized yet.
+ */
+ private void closeNativeResources() {
+ closeDbAndAccessors();
+ closeOptionsAndFilters();
+ }
+
+ private void closeDbAndAccessors() {
+ if (cfAccessor != null) {
+ try {
+ if (dbAccessor != null) {
+ cfAccessor.close(dbAccessor);
+ }
+ } catch (final Exception e) {
+ log.error("Error while closing column family handles for store
" + name, e);
+ }
+ cfAccessor = null;
+ }
+ if (dbAccessor != null) {
+ dbAccessor.close();
+ dbAccessor = null;
+ }
+ if (db != null) {
+ db.close();
+ db = null;
+ }
+ }
+
+ private void closeOptionsAndFilters() {
+ if (userSpecifiedOptions != null) {
+ userSpecifiedOptions.close();
+ userSpecifiedOptions = null;
+ }
+ if (wOptions != null) {
+ wOptions.close();
+ wOptions = null;
+ }
+ if (fOptions != null) {
+ fOptions.close();
+ fOptions = null;
+ }
+ if (filter != null) {
+ filter.close();
+ filter = null;
+ }
+ if (cache != null) {
+ cache.close();
+ cache = null;
+ }
+ if (statistics != null) {
+ statistics.close();
+ statistics = null;
+ }
+ }
+
private void closeOpenIterators() {
final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
synchronized (openIterators) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
index b7f8fd64f3b..2a37e05fb26 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
@@ -63,23 +63,29 @@ public class RocksDBTimestampedStore extends RocksDBStore
implements Timestamped
final ColumnFamilyHandle withTimestampColumnFamily =
columnFamilies.get(1);
final ColumnFamilyHandle offsetsColumnFamily = columnFamilies.get(2);
- final RocksIterator noTimestampsIter =
db.newIterator(noTimestampColumnFamily);
- noTimestampsIter.seekToFirst();
- if (noTimestampsIter.isValid()) {
- log.info("Opening store {} in upgrade mode", name);
- cfAccessor = new DualColumnFamilyAccessor(
- offsetsColumnFamily,
- noTimestampColumnFamily,
- withTimestampColumnFamily,
- TimestampedBytesStore::convertToTimestampedFormat,
- this, open
- );
- } else {
- log.info("Opening store {} in regular mode", name);
- cfAccessor = new SingleColumnFamilyAccessor(offsetsColumnFamily,
withTimestampColumnFamily);
- noTimestampColumnFamily.close();
+ try (final RocksIterator noTimestampsIter =
db.newIterator(noTimestampColumnFamily)) {
+ noTimestampsIter.seekToFirst();
+ if (noTimestampsIter.isValid()) {
+ log.info("Opening store {} in upgrade mode", name);
+ cfAccessor = new DualColumnFamilyAccessor(
+ offsetsColumnFamily,
+ noTimestampColumnFamily,
+ withTimestampColumnFamily,
+ TimestampedBytesStore::convertToTimestampedFormat,
+ this,
+ open
+ );
+ } else {
+ log.info("Opening store {} in regular mode", name);
+ cfAccessor = new
SingleColumnFamilyAccessor(offsetsColumnFamily, withTimestampColumnFamily);
+ noTimestampColumnFamily.close();
+ }
+ } catch (final RuntimeException e) {
+ for (final ColumnFamilyHandle handle : columnFamilies) {
+ handle.close();
+ }
+ throw e;
}
- noTimestampsIter.close();
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
index ea8ff9bc917..50cbecfd293 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
@@ -116,13 +116,18 @@ public class RocksDBTimestampedStoreWithHeaders extends
RocksDBStore implements
headersCf,
HeadersBytesStore::convertFromPlainToHeaderFormat,
this,
- open
+ open
);
} else {
log.info("Opening store {} in regular headers-aware mode",
name);
cfAccessor = new SingleColumnFamilyAccessor(offsetsCf,
headersCf);
defaultCf.close();
}
+ } catch (final RuntimeException e) {
+ for (final ColumnFamilyHandle handle : columnFamilies) {
+ handle.close();
+ }
+ throw e;
}
}
@@ -137,53 +142,55 @@ public class RocksDBTimestampedStoreWithHeaders extends
RocksDBStore implements
new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME,
columnFamilyOptions)
);
- // verify and close empty Default ColumnFamily
- try (final RocksIterator defaultIter =
db.newIterator(columnFamilies.get(0))) {
- defaultIter.seekToFirst();
- if (defaultIter.isValid()) {
- // Close all column family handles before throwing
- columnFamilies.get(0).close();
- columnFamilies.get(1).close();
- columnFamilies.get(2).close();
- throw new ProcessorStateException(
- "Inconsistent store state for " + name + ". " +
- "Cannot have both plain (DEFAULT) and timestamped data
simultaneously. " +
- "Headers store can upgrade from either plain or
timestamped format, but not both."
- );
+ try {
+ // verify and close empty Default ColumnFamily
+ try (final RocksIterator defaultIter =
db.newIterator(columnFamilies.get(0))) {
+ defaultIter.seekToFirst();
+ if (defaultIter.isValid()) {
+ throw new ProcessorStateException(
+ "Inconsistent store state for " + name + ". " +
+ "Cannot have both plain (DEFAULT) and timestamped
data simultaneously. " +
+ "Headers store can upgrade from either plain or
timestamped format, but not both."
+ );
+ }
}
// close default column family handle
columnFamilies.get(0).close();
- }
-
- final ColumnFamilyHandle legacyTimestampedCf = columnFamilies.get(1);
- final ColumnFamilyHandle headersCf = columnFamilies.get(2);
- final ColumnFamilyHandle offsetsCf = columnFamilies.get(3);
-
- // Check if legacy timestamped CF has data
- try (final RocksIterator legacyIter =
db.newIterator(legacyTimestampedCf)) {
- legacyIter.seekToFirst();
- if (legacyIter.isValid()) {
- log.info("Opening store {} in upgrade mode from timestamped
store", name);
- cfAccessor = new DualColumnFamilyAccessor(
- offsetsCf,
- legacyTimestampedCf,
- headersCf,
- HeadersBytesStore::convertToHeaderFormat,
- this,
- open
- );
- } else {
- log.info("Opening store {} in regular headers-aware mode",
name);
- cfAccessor = new SingleColumnFamilyAccessor(offsetsCf,
headersCf);
- try {
- db.dropColumnFamily(legacyTimestampedCf);
- } catch (final RocksDBException e) {
- throw new RuntimeException(e);
- } finally {
- legacyTimestampedCf.close();
+ final ColumnFamilyHandle legacyTimestampedCf =
columnFamilies.get(1);
+ final ColumnFamilyHandle headersCf = columnFamilies.get(2);
+ final ColumnFamilyHandle offsetsCf = columnFamilies.get(3);
+
+ // Check if legacy timestamped CF has data
+ try (final RocksIterator legacyIter =
db.newIterator(legacyTimestampedCf)) {
+ legacyIter.seekToFirst();
+ if (legacyIter.isValid()) {
+ log.info("Opening store {} in upgrade mode from
timestamped store", name);
+ cfAccessor = new DualColumnFamilyAccessor(
+ offsetsCf,
+ legacyTimestampedCf,
+ headersCf,
+ HeadersBytesStore::convertToHeaderFormat,
+ this,
+ open
+ );
+ } else {
+ log.info("Opening store {} in regular headers-aware mode",
name);
+ cfAccessor = new SingleColumnFamilyAccessor(offsetsCf,
headersCf);
+ try {
+ db.dropColumnFamily(legacyTimestampedCf);
+ } catch (final RocksDBException e) {
+ throw new RuntimeException(e);
+ } finally {
+ legacyTimestampedCf.close();
+ }
}
}
+ } catch (final RuntimeException e) {
+ for (final ColumnFamilyHandle handle : columnFamilies) {
+ handle.close();
+ }
+ throw e;
}
}