This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-java.git


The following commit(s) were added to refs/heads/main by this push:
     new ad59035ec GH-990: [JDBC] Fix memory leak on Connection#close due to 
unclosed ResultSet(s) (#991)
ad59035ec is described below

commit ad59035ec880920f285158a140467d8b8d41789c
Author: Pedro Matias <[email protected]>
AuthorDate: Tue Jan 27 22:25:29 2026 +0000

    GH-990: [JDBC] Fix memory leak on Connection#close due to unclosed 
ResultSet(s) (#991)
    
    ## What's Changed
    
    Closing a Connection when there was one or more unclosed ResultSet that
    had been obtained
    via methods of the interface DatabaseMetaData would generate exceptions
    due to memory leaks.
    
    Now, closing a Connection will first close all the ResultSet instances
    obtained from DatabaseMetadata
    instances associated with that Connection.
    
    Closes #990.
---
 .../arrow/driver/jdbc/ArrowFlightConnection.java   | 32 ++++++++++
 .../jdbc/ArrowFlightJdbcFlightStreamResultSet.java |  7 +++
 .../ArrowFlightJdbcVectorSchemaRootResultSet.java  | 11 ++--
 .../apache/arrow/driver/jdbc/ConnectionTest.java   | 70 ++++++++++++++++++++++
 4 files changed, 113 insertions(+), 7 deletions(-)

diff --git 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
index 0e9c198f5..623c2b81b 100644
--- 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
+++ 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
@@ -21,6 +21,8 @@ import static 
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -42,6 +44,8 @@ public final class ArrowFlightConnection extends 
AvaticaConnection {
   private final ArrowFlightSqlClientHandler clientHandler;
   private final ArrowFlightConnectionConfigImpl config;
   private ExecutorService executorService;
+  private int metadataResultSetCount;
+  private Map<Integer, ArrowFlightJdbcFlightStreamResultSet> 
metadataResultSetMap = new HashMap<>();
 
   /**
    * Creates a new {@link ArrowFlightConnection}.
@@ -66,6 +70,7 @@ public final class ArrowFlightConnection extends 
AvaticaConnection {
     this.config = Preconditions.checkNotNull(config, "Config cannot be null.");
     this.allocator = Preconditions.checkNotNull(allocator, "Allocator cannot 
be null.");
     this.clientHandler = Preconditions.checkNotNull(clientHandler, "Handler 
cannot be null.");
+    this.metadataResultSetCount = 0;
   }
 
   /**
@@ -173,6 +178,31 @@ public final class ArrowFlightConnection extends 
AvaticaConnection {
             : executorService;
   }
 
+  /**
+   * Registers a new metadata ResultSet and assigns it a unique ID. Metadata 
ResultSets are those
+   * created without an associated Statement.
+   *
+   * @param resultSet the ResultSet to register
+   * @return the assigned ID
+   */
+  int getNewMetadataResultSetId(ArrowFlightJdbcFlightStreamResultSet 
resultSet) {
+    metadataResultSetMap.put(metadataResultSetCount, resultSet);
+    return metadataResultSetCount++;
+  }
+
+  /**
+   * Unregisters a metadata ResultSet when it is closed. This method is called 
by metadata
+   * ResultSets during their close operation to remove themselves from the 
tracking map.
+   *
+   * @param id the ID of the ResultSet to unregister, or null if not a 
metadata ResultSet
+   */
+  void onResultSetClose(Integer id) {
+    if (id == null) {
+      return;
+    }
+    metadataResultSetMap.remove(id);
+  }
+
   @Override
   public Properties getClientInfo() {
     final Properties copy = new Properties();
@@ -190,7 +220,9 @@ public final class ArrowFlightConnection extends 
AvaticaConnection {
     } catch (final Exception e) {
       topLevelException = e;
     }
+    // copies of the collections are used to avoid concurrent modification 
problems
     ArrayList<AutoCloseable> closeables = new 
ArrayList<>(statementMap.values());
+    closeables.addAll(new ArrayList<>(metadataResultSetMap.values()));
     closeables.add(clientHandler);
     closeables.addAll(allocator.getChildAllocators());
     closeables.add(allocator);
diff --git 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java
 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java
index aabaf01e6..2885f7895 100644
--- 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java
+++ 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java
@@ -54,6 +54,7 @@ public final class ArrowFlightJdbcFlightStreamResultSet
   private VectorSchemaRoot currentVectorSchemaRoot;
 
   private Schema schema;
+  private Integer id = null; // used for metadata result sets only
 
   /** Public constructor used by ArrowFlightJdbcFactory. */
   ArrowFlightJdbcFlightStreamResultSet(
@@ -82,6 +83,7 @@ public final class ArrowFlightJdbcFlightStreamResultSet
     super(null, state, signature, resultSetMetaData, timeZone, firstFrame);
     this.connection = connection;
     this.flightInfo = flightInfo;
+    this.id = connection.getNewMetadataResultSetId(this);
   }
 
   /**
@@ -234,7 +236,12 @@ public final class ArrowFlightJdbcFlightStreamResultSet
 
   @Override
   public synchronized void close() {
+
     try {
+      if (isClosed()) {
+        return;
+      }
+      this.connection.onResultSetClose(id);
       if (flightEndpointDataQueue != null) {
         // flightStreamQueue should close currentFlightStream internally
         flightEndpointDataQueue.close();
diff --git 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcVectorSchemaRootResultSet.java
 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcVectorSchemaRootResultSet.java
index 622e5fe7f..49334951d 100644
--- 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcVectorSchemaRootResultSet.java
+++ 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcVectorSchemaRootResultSet.java
@@ -22,7 +22,6 @@ import java.sql.SQLException;
 import java.sql.Types;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Objects;
 import java.util.Set;
 import java.util.TimeZone;
 import org.apache.arrow.driver.jdbc.utils.ConvertUtils;
@@ -159,12 +158,10 @@ public class ArrowFlightJdbcVectorSchemaRootResultSet 
extends AvaticaResultSet {
     } catch (final Exception e) {
       exceptions.add(e);
     }
-    if (!Objects.isNull(statement)) {
-      try {
-        super.close();
-      } catch (final Exception e) {
-        exceptions.add(e);
-      }
+    try {
+      super.close();
+    } catch (final Exception e) {
+      exceptions.add(e);
     }
     exceptions.parallelStream().forEach(e -> LOGGER.error(e.getMessage(), e));
     exceptions.stream()
diff --git 
a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ConnectionTest.java
 
b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ConnectionTest.java
index dbedbe9d3..55722f60f 100644
--- 
a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ConnectionTest.java
+++ 
b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ConnectionTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.arrow.driver.jdbc;
 
+import static java.lang.String.format;
+import static java.util.stream.IntStream.range;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -23,24 +25,33 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
+import com.google.protobuf.Message;
 import java.net.URISyntaxException;
 import java.sql.Connection;
 import java.sql.Driver;
 import java.sql.DriverManager;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Map;
 import java.util.Properties;
+import java.util.function.Consumer;
 import org.apache.arrow.driver.jdbc.authentication.UserPasswordAuthentication;
 import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler;
 import 
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty;
 import org.apache.arrow.driver.jdbc.utils.MockFlightSqlProducer;
 import org.apache.arrow.flight.FlightMethod;
+import org.apache.arrow.flight.FlightProducer.ServerStreamListener;
 import org.apache.arrow.flight.NoOpSessionOptionValueVisitor;
 import org.apache.arrow.flight.SessionOptionValue;
+import org.apache.arrow.flight.sql.FlightSqlProducer.Schemas;
+import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTableTypes;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.util.Text;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -698,4 +709,63 @@ public class ConnectionTest {
       assertTrue(statements[i].isClosed());
     }
   }
+
+  @Test
+  public void testResultSetsFromDatabaseMetadataClosedOnConnectionClose() 
throws Exception {
+    // set up the FlightProducer to respond to metadata queries
+    // getTableTypes() is being used, but any other method would work
+    int rowCount = 3;
+    final Message commandGetTableTypes = 
CommandGetTableTypes.getDefaultInstance();
+    final Consumer<ServerStreamListener> commandGetTableTypesResultProducer =
+        listener -> {
+          try (final BufferAllocator allocator = new RootAllocator();
+              final VectorSchemaRoot root =
+                  VectorSchemaRoot.create(Schemas.GET_TABLE_TYPES_SCHEMA, 
allocator)) {
+            final VarCharVector tableType = (VarCharVector) 
root.getVector("table_type");
+            range(0, rowCount)
+                .forEach(i -> tableType.setSafe(i, new Text(format("table_type 
#%d", i))));
+            root.setRowCount(rowCount);
+            listener.start(root);
+            listener.putNext();
+          } catch (final Throwable throwable) {
+            listener.error(throwable);
+          } finally {
+            listener.completed();
+          }
+        };
+    PRODUCER.addCatalogQuery(commandGetTableTypes, 
commandGetTableTypesResultProducer);
+
+    // create a connection
+    final Properties properties = new Properties();
+    properties.put(ArrowFlightConnectionProperty.HOST.camelName(), 
"localhost");
+    properties.put(
+        ArrowFlightConnectionProperty.PORT.camelName(), 
FLIGHT_SERVER_TEST_EXTENSION.getPort());
+    properties.put(ArrowFlightConnectionProperty.USER.camelName(), userTest);
+    properties.put(ArrowFlightConnectionProperty.PASSWORD.camelName(), 
passTest);
+    properties.put("useEncryption", false);
+
+    Connection connection =
+        DriverManager.getConnection(
+            "jdbc:arrow-flight-sql://"
+                + FLIGHT_SERVER_TEST_EXTENSION.getHost()
+                + ":"
+                + FLIGHT_SERVER_TEST_EXTENSION.getPort(),
+            properties);
+
+    // create ResultSets from DatabaseMetadata
+    int numResultSets = 3;
+    ResultSet[] resultSets = new ResultSet[numResultSets];
+    for (int i = 0; i < numResultSets; i++) {
+      resultSets[i] = connection.getMetaData().getTableTypes();
+      assertFalse(resultSets[i].isClosed());
+    }
+
+    // close the connection
+    connection.close();
+
+    // assert the ResultSets are closed
+    for (int i = 0; i < numResultSets; i++) {
+      assertTrue(resultSets[i].isClosed());
+    }
+  }
 }

Reply via email to