Ma1oneZhang opened a new issue, #50:
URL: https://github.com/apache/arrow-java/issues/50

   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   Hi arrow maintainer, i find some memory leak in my flight-sql usage. I use 
arrow-16.0.0, and works on AMD Ryzen 3970X machine.
   ```java
   package org.example;
   
   import org.apache.arrow.flight.*;
   import org.apache.arrow.flight.grpc.CredentialCallOption;
   import org.apache.arrow.flight.sql.FlightSqlClient;
   import org.apache.arrow.memory.BufferAllocator;
   import org.apache.arrow.memory.RootAllocator;
   import org.apache.arrow.vector.*;
   import org.apache.arrow.vector.types.pojo.Field;
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   
   import java.util.*;
   
   public class SqlRunner {
   
       private static final Logger log = 
LoggerFactory.getLogger(SqlRunner.class);
   
       static void run_flight_sql() throws Exception {
           try (BufferAllocator allocator = new 
RootAllocator(Integer.MAX_VALUE)) {
               final Location clientLocation = 
Location.forGrpcInsecure("127.0.0.1", 8360);
               try (FlightClient client = FlightClient.builder(allocator, 
clientLocation).build();
                    FlightSqlClient sqlClient = new FlightSqlClient(client)) {
   
                   Optional<CredentialCallOption> credentialCallOption = 
client.authenticateBasicToken("admin", "public");
                   CallHeaders headers = new FlightCallHeaders();
                   headers.insert("database", "test");
   
                   Set<CallOption> options = new HashSet<>();
                   credentialCallOption.ifPresent(options::add);
                   options.add(new HeaderCallOption(headers));
                  // use the sql to query is ok
                  try {
                      String query = "SELECT count(*) from test.sx1;";
                      executeQuery(sqlClient, query, options);
                  } catch (Exception e){
                      e.printStackTrace();
                      throw e;
                  }
   
                  // where memory leak happens
                   try {
                       try (FlightSqlClient.PreparedStatement preparedStatement 
= sqlClient.prepare("insert into table sx1 (sid, value, flag) values(?, ?, 
?);", options.toArray(new CallOption[0]))) {
                           insertBatch(sqlClient, preparedStatement, allocator, 
options);
                       }
                   } catch (Exception e){
                       e.printStackTrace();
                   }
               }
           }
       }
   
       private static void executeQuery(FlightSqlClient sqlClient, String 
query, Set<CallOption> options) throws Exception {
           final FlightInfo info = sqlClient.execute(query, options.toArray(new 
CallOption[0]));
           final Ticket ticket = info.getEndpoints().get(0).getTicket();
           try (FlightStream stream = sqlClient.getStream(ticket, 
options.toArray(new CallOption[0]))) {
               while (stream.next()) {
                   try (VectorSchemaRoot schemaRoot = stream.getRoot()) {
                       log.info(schemaRoot.contentToTSVString());
                   }
               }
           }
       }
   
       private static void insertBatch(FlightSqlClient sqlClient, 
FlightSqlClient.PreparedStatement preparedStatement, BufferAllocator allocator, 
Set<CallOption> options) throws Exception {
           try (IntVector sids = new IntVector("sid", allocator);
                Float4Vector values = new Float4Vector("value", allocator);
                TinyIntVector flags = new TinyIntVector("flag", allocator)) {
   
               sids.allocateNew(100);
               values.allocateNew(100);
               flags.allocateNew(100);
   
               for (int i = 0; i < 100; i++) {
                   sids.setSafe(i, i);
                   values.setSafe(i, (float) i);
                   flags.setSafe(i, (byte) i);
               }
   
               List<Field> fields = Arrays.asList(sids.getField(), 
values.getField(), flags.getField());
               List<FieldVector> fieldVectors = Arrays.asList(sids, values, 
flags);
               try (VectorSchemaRoot vectorSchemaRoot = new 
VectorSchemaRoot(fields, fieldVectors)){
                   vectorSchemaRoot.setRowCount(100);
   
                   preparedStatement.setParameters(vectorSchemaRoot);
                   FlightInfo info = preparedStatement.execute();
   
                   final Ticket ticket = info.getEndpoints().get(0).getTicket();
                   try (FlightStream stream = sqlClient.getStream(ticket, 
options.toArray(new CallOption[0]))) {
                       while (stream.next()) {
                           try (VectorSchemaRoot schemaRoot = stream.getRoot()) 
{
                               List<FieldVector> vectors = 
schemaRoot.getFieldVectors();
                               for (int i = 0; i < vectors.size(); i++) {
                                   System.out.printf("%d %s\n", i, 
vectors.get(i));
                               }
                           }
                       }
                   }
                   preparedStatement.clearParameters();
               }
           }
       }
   
       public static void main(String[] args) throws Exception {
           run_flight_sql();
       }
   }
   ```
   
   That's the memory allocator verboseString, is it a bug? or server side bad 
implement?
   ```bash
   =================================================================
   Allocator(ROOT) 0/16/1424/2147483647 (res/actual/peak/limit)
     child allocators: 1
       Allocator(flight-client) 0/16/272/9223372036854775807 
(res/actual/peak/limit)
         child allocators: 0
         ledgers: 1
           ledger[6] allocator: flight-client), isOwning: , size: , references: 
1, life: 1242736490302663..0, allocatorManager: [, life: ] holds 1 buffers. 
               ArrowBuf[23], address:139720841494544, capacity:16
          event log for: ArrowBuf[23]
            1242736490544534 create()
                     at 
org.apache.arrow.memory.util.HistoricalLog$Event.<init>(HistoricalLog.java:180)
                     at 
org.apache.arrow.memory.util.HistoricalLog.recordEvent(HistoricalLog.java:85)
                     at 
org.apache.arrow.memory.ArrowBuf.<init>(ArrowBuf.java:98)
                     at 
org.apache.arrow.memory.BufferLedger.newArrowBuf(BufferLedger.java:259)
                     at 
org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:352)
                     at 
org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:328)
                     at 
org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:291)
                     at 
org.apache.arrow.flight.PutResult.fromProtocol(PutResult.java:82)
                     at 
org.apache.arrow.flight.FlightClient$SetStreamObserver.onNext(FlightClient.java:466)
                     at 
org.apache.arrow.flight.FlightClient$SetStreamObserver.onNext(FlightClient.java:454)
                     at 
io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:468)
                     at 
io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
                     at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:657)
                     at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:644)
                     at 
io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
                     at 
io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
                     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                     at java.lang.Thread.run(Thread.java:750)
   
         reservations: 0
     ledgers: 0
     reservations: 0
   
   =================================================================
   ```
   
   ### Component(s)
   
   Java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to