hu6360567 opened a new issue, #185:
URL: https://github.com/apache/arrow-java/issues/185

   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   I'm trying to import/export data to database in python through `ArrayStream` 
over pyarrow.jvm and JDBC.
   
   In order to export ArrowVectorIterator as stream without unloading to 
RecordBatch on java side before it export to stream, I wrap ArrowVectorIterator 
into ArrowReader as below:
   ```java
   public class ArrowVectorIteratorReader extends ArrowReader {
   
       private final Iterator<VectorSchemaRoot> iterator;
       private final Schema schema;
       private VectorSchemaRoot root;
   
       public ArrowVectorIteratorReader(BufferAllocator allocator, 
Iterator<VectorSchemaRoot> iterator, Schema schema) {
           super(allocator);
           this.iterator = iterator;
           this.schema = schema;
           this.root = null;
       }
   
       @Override
       public VectorSchemaRoot getVectorSchemaRoot() throws IOException {
           if (root == null) return super.getVectorSchemaRoot();
           return root;
       }
   
       @Override
       public boolean loadNextBatch() throws IOException {
           if (iterator.hasNext()) {
               VectorSchemaRoot lastRoot = root;
               root = iterator.next();
               if (root != lastRoot && lastRoot != null) lastRoot.close();
               return true;
           } else {
               return false;
           }
       }
   
       @Override
       public long bytesRead() {
           return 0;
       }
   
       @Override
       protected void closeReadSource() throws IOException {
           if (iterator instanceof AutoCloseable) {
               try {
                   ((AutoCloseable) iterator).close();
               } catch (Exception e) {
                   throw new IOException(e);
               }
           }
           root.close();
       }
   
       @Override
       protected Schema readSchema() throws IOException {
           return schema;
       }
   }
   
   ```
   
   When ArrowVectorIterator use the config with `reuseVectorSchemaRoot` is 
enabled, utf8 array may crushed on python side, but works as expectred on java 
side.
   
   Java code as below
   ```java
   try (final ArrowReader source = porter.importData(1);    returns 
ArrowVectorIteratorReader with batchSize=1
                final ArrowArrayStream stream = 
ArrowArrayStream.allocateNew(allocator)) {
               Data.exportArrayStream(allocator, source, stream);
   
               try (final ArrowReader reader = 
Data.importArrayStream(allocator, stream)) {
                   while (reader.loadNextBatch()) {
                       // root from getVectorSchemaRoot() is legal on every 
vector
                       totalRecord += 
reader.getVectorSchemaRoot().getRowCount();
                   }
               }
           }
   ```
   
   On Python side, the situation is unexplainable.
   The exported stream from Java in wrapped into a RecordBatchReader and write 
into different file formats.
   ```python
   def wrap_from_java_stream_to_generator(java_arrow_stream, allocator=None, 
yield_schema=False):
       if allocator is None:
           allocator = get_java_root_allocator().allocator
       c_stream = arrow_c.new("struct ArrowArrayStream*")
       c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))
   
       org = jpype. JPackage("org")
       java_wrapped_stream = 
org.apache.arrow.c.ArrowArrayStream.wrap(c_stream_ptr)
   
       org.apache.arrow.c.Data.exportArrayStream(allocator, java_arrow_stream, 
java_wrapped_stream)
   
       # noinspection PyProtectedMember
       with pa. RecordBatchReader._import_from_c(c_stream_ptr) as reader:  # 
type: pa. RecordBatchReader
           if yield_schema:
               yield reader.schema
           yield from reader
   
   
   def wrap_from_java_stream(java_arrow_stream, allocator=None):
       generator = wrap_from_java_stream_to_generator(java_arrow_stream, 
allocator, yield_schema=True)
       schema = next(generator)
   
       return pa. RecordBatchReader.from_batches(schema, generator)
   ```
   For CSV, works as expected
   ```python
   with wrap_from_java_stream(java_arrow_stream, allocator) as stream:
       with pa.csv.CSVWriter(csv_path, stream.schema) as writer:
           for record_batch in stream:
               writer.write_batch(record_batch)
   ```
   
   For Parquet, writing with dataset api as below
   ```python
   with wrap_from_java_stream(java_arrow_stream, allocator) as stream:
       pa.dataset.write_dataset(stream, data_path, format="parquet")
   ```
   
   ```
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   ......./python3.8/site-packages/pyarrow/dataset.py:999: in write_dataset
       _filesystemdataset_write(
   pyarrow/_dataset.pyx:3655: in pyarrow._dataset._filesystemdataset_write
       ???
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   >   ???
   E   pyarrow.lib.ArrowInvalid: Parquet cannot store strings with size 2GB or 
more
   ```
   OR
   ```
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   >   ???
   E   pyarrow.lib.ArrowInvalid: Column 1: In chunk 0: Invalid: Length spanned 
by binary offsets (7) larger than values array (size 6)
   ```
   
   In order to making out which record raises error, RecordBatchReader is 
wrapped into a smaller batch size and log the content as below:
   ```python
   with wrap_from_java_stream(java_arrow_stream, allocator) as stream:
       def generator():
           for rb in stream:
               for i in range(rb.num_rows):
                   slice = rb.slice(i,1)
                   logger.info(slice.to_pylist())
                   yield slice
       
pa.dataset.write_dataset(pa.RecordBatchReader.from_batches(stream.schema, 
generator(), data_path, format="parquet")
   ```
   Although the logger can print the slice, but write_dataset fails 
   ```
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   ......./python3.8/site-packages/pyarrow/dataset.py:999: in write_dataset
       _filesystemdataset_write(
   pyarrow/_dataset.pyx:3655: in pyarrow._dataset._filesystemdataset_write
       ???
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   >   ???
   E   pyarrow.lib.ArrowInvalid: Column 1: In chunk 0: Invalid: First or last 
binary offset out of bounds
   ```
   
   For arrow/feather format, it seems directly write record_batch into files, 
but when record_batch is invalid when reading from file (code is similar as 
above)
   
   
   Then, if I create the ArrowVectorIteratorReader without 
reuseVectorSchemaRoot, everything works fine on Python side.
   
   ### Component(s)
   
   Java, Python


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to