EnricoMi opened a new issue, #43552:
URL: https://github.com/apache/arrow/issues/43552

   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   Fetching data via Apache Arrow Flight (C++, Java, Python involved) and 
passing them to Apache DataFusion (Rust) does not work:
   
       Memory pointer from external source (e.g, FFI) is not aligned with the 
specified scalar type.
       Before importing buffer through FFI, please make sure the allocation is 
aligned.
   
   This is likely due to #32276 / #36400.
   
   Error:
   ```
   thread '<unnamed>' panicked at 
/root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-buffer-52.0.0/src/buffer/scalar.rs:138:17:
   Memory pointer from external source (e.g, FFI) is not aligned with the 
specified scalar type. Before importing buffer through FFI, please make sure 
the allocation is aligned.
   stack backtrace:
      0:     0x7f4d25f576ea - <std::sys::backtrace::_print::DisplayBacktrace as 
core::fmt::Display>::fmt::h584e154fdf2d8641
      1:     0x7f4d24a0eb7b - core::fmt::write::h810564c4cb1595da
      2:     0x7f4d25f252a2 - std::io::Write::write_fmt::ha00a1de7318f2a48
      3:     0x7f4d25f5cb29 - std::sys::backtrace::print::h6238e978e425409a
      4:     0x7f4d25f5c316 - 
std::panicking::default_hook::{{closure}}::h0acffbc0a684bdb8
      5:     0x7f4d25f5d6f5 - 
std::panicking::rust_panic_with_hook::hcdf40f293c76fc9f
      6:     0x7f4d25f5cec2 - 
std::panicking::begin_panic_handler::{{closure}}::hfd12f36809a34009
      7:     0x7f4d25f5ce59 - 
std::sys::backtrace::__rust_end_short_backtrace::h6a9267615b3cf1db
      8:     0x7f4d25f5ce44 - rust_begin_unwind
      9:     0x7f4d24a0d4f2 - core::panicking::panic_fmt::hcabcb14b752ed0b3
     10:     0x7f4d246177b1 - 
arrow_buffer::buffer::scalar::ScalarBuffer<T>::new::h04fe130fe772026c
     11:     0x7f4d254255a0 - arrow_array::array::get_offsets::h216d5a4c8918fc01
     12:     0x7f4d2438af33 - arrow_array::array::make_array::h2cb6f33b6d6b2c59
     13:     0x7f4d24390613 - <arrow_array::array::struct_array::StructArray as 
core::convert::From<arrow_data::data::ArrayData>>::from::h07d61a4146018136
     14:     0x7f4d24246b46 - <arrow_array::record_batch::RecordBatch as 
arrow::pyarrow::FromPyArrow>::from_pyarrow_bound::h1af3cfef0e2e6e5a
     15:     0x7f4d23f169cb - <core::iter::adapters::GenericShunt<I,R> as 
core::iter::traits::iterator::Iterator>::next::h46b0b854c3de8071
     16:     0x7f4d240b7375 - <alloc::vec::Vec<T> as 
arrow::pyarrow::FromPyArrow>::from_pyarrow_bound::he22d7662854ef688
     17:     0x7f4d23f1864b - <core::iter::adapters::GenericShunt<I,R> as 
core::iter::traits::iterator::Iterator>::next::hd78ea1387d583d39
     18:     0x7f4d240293e0 - 
pyo3::impl_::extract_argument::extract_argument::hf1fa544aa176067c
     19:     0x7f4d2413265f - 
datafusion_python::context::PySessionContext::__pymethod_create_dataframe__::h0938b7fdddde4e81
     20:     0x7f4d24026e61 - 
pyo3::impl_::trampoline::trampoline::h86dfdb741875b71a
     21:     0x7f4d2412a441 - datafusion_python::context::<impl 
pyo3::impl_::pyclass::PyMethods<datafusion_python::context::PySessionContext> 
for 
pyo3::impl_::pyclass::PyClassImplCollector<datafusion_python::context::PySessionContext>>::py_methods::ITEMS::trampoline::hd70e045974ca5aa8
     22:     0x560f47b75569 - <unknown>
     23:     0x560f47b5cb2b - _PyEval_EvalFrameDefault
     24:     0x560f47b746ac - _PyFunction_Vectorcall
     25:     0x560f47b5c935 - _PyEval_EvalFrameDefault
     26:     0x560f47b59096 - <unknown>
     27:     0x560f47c4ef66 - PyEval_EvalCode
     28:     0x560f47c79e98 - <unknown>
     29:     0x560f47c7379b - <unknown>
     30:     0x560f47c79be5 - <unknown>
     31:     0x560f47c790c8 - _PyRun_SimpleFileObject
     32:     0x560f47c78d13 - _PyRun_AnyFileObject
     33:     0x560f47c6b70e - Py_RunMain
     34:     0x560f47c41dfd - Py_BytesMain
     35:     0x7f4d5a029d90 - __libc_start_call_main
                                  at 
./csu/../sysdeps/nptl/libc_start_call_main.h:58:16
     36:     0x7f4d5a029e40 - __libc_start_main_impl
                                  at ./csu/../csu/libc-start.c:392:3
     37:     0x560f47c41cf5 - _start
     38:                0x0 - <unknown>
   Traceback (most recent call last):
     File "/home/enrico/git/arrow-datafusion-issue/example.py", line 41, in 
<module>
       main(sys.argv[1])
     File "/home/enrico/git/arrow-datafusion-issue/example.py", line 33, in main
       df = ctx.create_dataframe([[batch for batch in partition] for partition 
in partitions])
   pyo3_runtime.PanicException: Memory pointer from external source (e.g, FFI) 
is not aligned with the specified scalar type. Before importing buffer through 
FFI, please make sure the allocation is aligned.
   ```
   
   Reproduce as follows:
   ```bash
   git clone --depth=1 https://github.com/apache/arrow.git
   git clone --depth=1 https://github.com/apache/arrow-testing.git
   
   python -m venv venv
   source venv/bin/activate
   pip install pyarrow pandas datafusion
   
   python arrow/python/examples/flight/server.py
   RUST_BACKTRACE=1 python example.py 
arrow-testing/data/csv/aggregate_test_100.csv
   ```
   
   with example.py:
   ```python
   import sys
   
   import datafusion
   import pyarrow
   import pyarrow.flight
   import pyarrow.csv as csv
   
   
   def push_data(client, path):
       my_table = csv.read_csv(path).select(["c1"])
       df = my_table.to_pandas()
       writer, _ = 
client.do_put(pyarrow.flight.FlightDescriptor.for_path("file"), my_table.schema)
       writer.write_table(my_table)
       writer.close()
   
   
   def get_data(client):
       descriptor = pyarrow.flight.FlightDescriptor.for_path("file")
       info = client.get_flight_info(descriptor)
       for endpoint in info.endpoints:
           for location in endpoint.locations:
               get_client = pyarrow.flight.FlightClient(location)
               reader = get_client.do_get(endpoint.ticket)
               yield reader.to_reader()
   
   
   def main(path):
       client = pyarrow.flight.FlightClient(f"grpc+tcp://localhost:5005")
       push_data(client, path)
       partitions = get_data(client)
   
       ctx = datafusion.SessionContext()
       df = ctx.create_dataframe([[batch for batch in partition] for partition 
in partitions])
       print(df)
   
   
   if __name__ == '__main__':
       if len(sys.argv) != 2:
           print("Provide the path to example CSV file")
           sys.exit(1)
       main(sys.argv[1])
   ```
   
   The error is thrown in Apache Arrow Rust implementaton: 
https://github.com/apache/arrow-rs/blob/eddef43d1cb46c1287da187ea1d86b0e1dc35a13/arrow-buffer/src/buffer/scalar.rs#L138
   ```rust
   let align = std::mem::align_of::<T>();
   let is_aligned = buffer.as_ptr().align_offset(align) == 0;
   
   match buffer.deallocation() {
       Deallocation::Standard(_) => assert!(
           is_aligned,
           "Memory pointer is not aligned with the specified scalar type"
       ),
       Deallocation::Custom(_, _) =>
           assert!(is_aligned, "Memory pointer from external source (e.g, FFI) 
is not aligned with the specified scalar type. Before importing buffer through 
FFI, please make sure the allocation is aligned."),
   }
   ```
   In my environment, depending on the CSV column, e.g. `c1` (`c2`), `align` is 
`4` (`8`) while `buffer.as_ptr().align_offset(align)` is always `3` (`7`), 
where `arrow-rs` requires this to be `0`.
   
   ### Component(s)
   
   FlightRPC


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to