Ma1oneZhang opened a new issue, #44213:
URL: https://github.com/apache/arrow/issues/44213
### Describe the bug, including details regarding any error messages,
version, and platform.
Hi arrow maintainer, i find some memory leak in my flight-sql usage.
```java
package org.example;
import org.apache.arrow.flight.*;
import org.apache.arrow.flight.grpc.CredentialCallOption;
import org.apache.arrow.flight.sql.FlightSqlClient;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.types.pojo.Field;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class SqlRunner {
private static final Logger log =
LoggerFactory.getLogger(SqlRunner.class);
static void run_flight_sql() throws Exception {
try (BufferAllocator allocator = new
RootAllocator(Integer.MAX_VALUE)) {
final Location clientLocation =
Location.forGrpcInsecure("127.0.0.1", 8360);
try (FlightClient client = FlightClient.builder(allocator,
clientLocation).build();
FlightSqlClient sqlClient = new FlightSqlClient(client)) {
Optional<CredentialCallOption> credentialCallOption =
client.authenticateBasicToken("admin", "public");
CallHeaders headers = new FlightCallHeaders();
headers.insert("database", "test");
Set<CallOption> options = new HashSet<>();
credentialCallOption.ifPresent(options::add);
options.add(new HeaderCallOption(headers));
// use the sql to query is ok
try {
String query = "SELECT count(*) from test.sx1;";
executeQuery(sqlClient, query, options);
} catch (Exception e){
e.printStackTrace();
throw e;
}
// where memory leak happens
try {
try (FlightSqlClient.PreparedStatement preparedStatement
= sqlClient.prepare("insert into table sx1 (sid, value, flag) values(?, ?,
?);", options.toArray(new CallOption[0]))) {
insertBatch(sqlClient, preparedStatement, allocator,
options);
}
} catch (Exception e){
e.printStackTrace();
}
}
}
}
private static void executeQuery(FlightSqlClient sqlClient, String
query, Set<CallOption> options) throws Exception {
final FlightInfo info = sqlClient.execute(query, options.toArray(new
CallOption[0]));
final Ticket ticket = info.getEndpoints().get(0).getTicket();
try (FlightStream stream = sqlClient.getStream(ticket,
options.toArray(new CallOption[0]))) {
while (stream.next()) {
try (VectorSchemaRoot schemaRoot = stream.getRoot()) {
log.info(schemaRoot.contentToTSVString());
}
}
}
}
private static void insertBatch(FlightSqlClient sqlClient,
FlightSqlClient.PreparedStatement preparedStatement, BufferAllocator allocator,
Set<CallOption> options) throws Exception {
try (IntVector sids = new IntVector("sid", allocator);
Float4Vector values = new Float4Vector("value", allocator);
TinyIntVector flags = new TinyIntVector("flag", allocator)) {
sids.allocateNew(100);
values.allocateNew(100);
flags.allocateNew(100);
for (int i = 0; i < 100; i++) {
sids.setSafe(i, i);
values.setSafe(i, (float) i);
flags.setSafe(i, (byte) i);
}
List<Field> fields = Arrays.asList(sids.getField(),
values.getField(), flags.getField());
List<FieldVector> fieldVectors = Arrays.asList(sids, values,
flags);
try (VectorSchemaRoot vectorSchemaRoot = new
VectorSchemaRoot(fields, fieldVectors)){
vectorSchemaRoot.setRowCount(100);
preparedStatement.setParameters(vectorSchemaRoot);
FlightInfo info = preparedStatement.execute();
final Ticket ticket = info.getEndpoints().get(0).getTicket();
try (FlightStream stream = sqlClient.getStream(ticket,
options.toArray(new CallOption[0]))) {
while (stream.next()) {
try (VectorSchemaRoot schemaRoot = stream.getRoot())
{
List<FieldVector> vectors =
schemaRoot.getFieldVectors();
for (int i = 0; i < vectors.size(); i++) {
System.out.printf("%d %s\n", i,
vectors.get(i));
}
}
}
}
preparedStatement.clearParameters();
}
}
}
public static void main(String[] args) throws Exception {
run_flight_sql();
}
}
```
That's the memory allocator verboseString, is it a bug? or server side bad
implement?
```bash
=================================================================
Allocator(ROOT) 0/16/1424/2147483647 (res/actual/peak/limit)
child allocators: 1
Allocator(flight-client) 0/16/272/9223372036854775807
(res/actual/peak/limit)
child allocators: 0
ledgers: 1
ledger[6] allocator: flight-client), isOwning: , size: , references:
1, life: 1242736490302663..0, allocatorManager: [, life: ] holds 1 buffers.
ArrowBuf[23], address:139720841494544, capacity:16
event log for: ArrowBuf[23]
1242736490544534 create()
at
org.apache.arrow.memory.util.HistoricalLog$Event.<init>(HistoricalLog.java:180)
at
org.apache.arrow.memory.util.HistoricalLog.recordEvent(HistoricalLog.java:85)
at
org.apache.arrow.memory.ArrowBuf.<init>(ArrowBuf.java:98)
at
org.apache.arrow.memory.BufferLedger.newArrowBuf(BufferLedger.java:259)
at
org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:352)
at
org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:328)
at
org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:291)
at
org.apache.arrow.flight.PutResult.fromProtocol(PutResult.java:82)
at
org.apache.arrow.flight.FlightClient$SetStreamObserver.onNext(FlightClient.java:466)
at
org.apache.arrow.flight.FlightClient$SetStreamObserver.onNext(FlightClient.java:454)
at
io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:468)
at
io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
at
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:657)
at
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:644)
at
io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
reservations: 0
ledgers: 0
reservations: 0
=================================================================
```
### Component(s)
Java
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]