muyihao opened a new issue, #144:
URL: https://github.com/apache/arrow-java/issues/144

   ### Describe the usage question you have. Please include as many useful 
details as  possible.
   
   
   In my Flink process function, I receive serialized VectorSchemaRoot data 
which needs to be deserialized for further processing. As I need to operate on 
it row by row, I utilized the slice function. However, this approach can lead 
to an increase in direct memory usage which in turn reduces the heap memory 
space in Flink. Ultimately, this can result in an Out Of Memory (OOM) exception 
as the heap memory space becomes insufficient. However, if I serialize the 
sliced VectorSchemaRoot and pass it as a parameter to the downstream function, 
which then deserializes it again, there will be no more OOM issues.
   
   ```  
     public void processElement(I value, ProcessFunction<I, Object>.Context 
context, Collector<Object> collector) throws Exception {
           if (this.writerHelper == null) {
               initWriterHelper();
           }
          reader = new ArrowStreamReader(new ByteArrayInputStream((byte[]) 
value), ArrowUtil.rootAllocator);
           try {
               while (reader.loadNextBatch()) {
                   VectorSchemaRoot vsr = reader.getVectorSchemaRoot();
                   int rowCount = vsr.getRowCount();
                   for (int i = 0; i < rowCount; i++) {
                       //split to row
                       VectorSchemaRoot row = vsr.slice(i, 1);
                       // this.writerHelper.write(row); This approach can 
result in a memory leak, whereas the following method will not.
   
                       ByteArrayOutputStream out = new ByteArrayOutputStream();
                       ArrowStreamWriter writer =
                               new ArrowStreamWriter(row, null, 
Channels.newChannel(out));
                       writer.start();
                       writer.writeBatch();
                       this.writerHelper.write(out.toByteArray());
                       row.clear();
                       row.close();
                   }
                   vsr.clear();
                   vsr.close();
               }
           } catch (Exception ex) {
               ex.printStackTrace();
           } finally {
               reader.close();
           }
       }```
   
   ### Component(s)
   
   Java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to