alamb commented on code in PR #22029:
URL: https://github.com/apache/datafusion/pull/22029#discussion_r3234763331
##########
datafusion/functions/src/strings.rs:
##########
@@ -673,18 +911,123 @@ impl StringViewArrayBuilder {
}
}
+/// [`StringWriter`] for [`StringViewArrayBuilder`].
+///
+/// The writer accumulates the first up-to-12 bytes of a row in a stack
+/// buffer; if the row stays inline-sized, it never touches the data block.
+/// On the first write that would exceed 12 bytes, the stack buffer is
+/// spilled into the builder's in-progress block and subsequent writes go
+/// directly there.
+pub(crate) struct StringViewWriter<'a> {
+ inline_buf: [u8; 12],
+ inline_len: u8,
+ /// `None` while the row fits inline; becomes `Some(start)` (offset of
+ /// the row's first byte in `in_progress`) at first spill.
+ spill_cursor: Option<usize>,
+ builder: &'a mut StringViewArrayBuilder,
+}
+
+impl StringWriter for StringViewWriter<'_> {
+ #[inline]
+ fn write_str(&mut self, s: &str) {
+ let bytes = s.as_bytes();
+ match self.spill_cursor {
+ None => {
+ let inline_len = self.inline_len as usize;
+ let new_len = inline_len + bytes.len();
+ if new_len <= 12 {
+
self.inline_buf[inline_len..new_len].copy_from_slice(bytes);
+ self.inline_len = new_len as u8;
+ } else {
+ // First spill of this row: reserve capacity (which may
+ // flush the current block — safe, no row-data is in it
+ // yet for this row), copy the buffered prefix, then
+ // write the new bytes.
+ self.builder.ensure_long_capacity(new_len as u32);
+ let cursor = self.builder.in_progress.len();
+ self.builder
+ .in_progress
+ .extend_from_slice(&self.inline_buf[..inline_len]);
+ self.builder.in_progress.extend_from_slice(bytes);
+ self.spill_cursor = Some(cursor);
+ }
+ }
+ Some(_) => {
+ self.builder.in_progress.extend_from_slice(bytes);
+ }
+ }
+ }
+
+ #[inline]
+ fn write_char(&mut self, c: char) {
+ let len = c.len_utf8();
+ match self.spill_cursor {
+ None => {
+ let inline_len = self.inline_len as usize;
+ let new_len = inline_len + len;
+ if new_len <= 12 {
+ c.encode_utf8(&mut self.inline_buf[inline_len..new_len]);
+ self.inline_len = new_len as u8;
+ } else {
+ self.builder.ensure_long_capacity(new_len as u32);
+ let cursor = self.builder.in_progress.len();
+ self.builder
+ .in_progress
+ .extend_from_slice(&self.inline_buf[..inline_len]);
+ push_char_to_vec(&mut self.builder.in_progress, c);
+ self.spill_cursor = Some(cursor);
+ }
+ }
+ Some(_) => {
+ push_char_to_vec(&mut self.builder.in_progress, c);
+ }
+ }
+ }
+}
+
+#[inline]
+fn push_char_to_vec(v: &mut Vec<u8>, c: char) {
+ let len = c.len_utf8();
+ let old_len = v.len();
+ v.reserve(len);
+ // SAFETY: we reserved `len` bytes above, write valid UTF-8 into those
+ // bytes, then update the initialized length to include them.
+ unsafe {
+ let dst = v.as_mut_ptr().add(old_len);
+ if len == 1 {
+ *dst = c as u8;
+ } else {
+ c.encode_utf8(std::slice::from_raw_parts_mut(dst, len));
+ }
+ v.set_len(old_len + len);
+ }
+}
+
/// Trait abstracting over the bulk-NULL string array builders.
///
/// Similar to Arrow's `StringLikeArrayBuilder`, this allows generic dispatch
/// over the three string array types (Utf8, LargeUtf8, Utf8View) when the
/// function body is uniform across them.
pub(crate) trait BulkNullStringArrayBuilder {
+ /// Per-builder concrete writer type, exposed as a GAT so generic callers
+ /// can use the inherent (non-`dyn`) writer methods without vtable
+ /// dispatch.
+ type Writer<'a>: StringWriter
+ where
+ Self: 'a;
+
fn append_value(&mut self, value: &str);
Review Comment:
I think it would help if we also added documentation for these methods --
and maybe some commentary about when to use one ro the other.
Specifically this context seem valuable and non-obvious when reading the
code for the first time
> (We need two new APIs because append_byte_map vectorizes a lot better than
append_with, so callers that fit the byte-to-byte map pattern should prefer it.)
I think you already have most of these comments on the implementations in
StringViewArrayBuilder, etc -- so maybe you could conlidate the docs in the
trait and update the builders to have a doc link to `BulkNullStringArrayBuilder`
##########
datafusion/functions/src/strings.rs:
##########
@@ -983,6 +1400,174 @@ mod tests {
let _ = builder.finish(None);
}
+ #[test]
+ fn string_view_array_builder_append_with_inline() {
Review Comment:
I wonder if we could reduce the repeitition of these tests a bit by
1. Defining the input (as a closure)
2. Defining the expected output
3. Running the same inputs/otuput through all three types of builders
For example soemthing like
```rust
#[test] // would need to do this for StringViewArray, StringArray,
LargeStringArray
fn bulk_string_view_inline() {
let inputs = vec![Some("hello"), Some("world"), None,
Some("0123456789ab")];
let ouput = build_array<StringViewArrayBuilder>(|builder| {
for s in &inputs {
builder.append_with(|w| w.write_str(s));
}
}
let expected= StringViewArray::from(inputs);
assert_eq!(ouput, expected)
}
```
I think it would take some fiddling to get the generics right, but once that
was done I think the tests would be easier to understand (much more concise)
and add more coverge
It seems like you did something similar with
`bulk_null_trait_append_with_string` below
##########
datafusion/functions/src/strings.rs:
##########
@@ -673,18 +911,123 @@ impl StringViewArrayBuilder {
}
}
+/// [`StringWriter`] for [`StringViewArrayBuilder`].
+///
+/// The writer accumulates the first up-to-12 bytes of a row in a stack
+/// buffer; if the row stays inline-sized, it never touches the data block.
+/// On the first write that would exceed 12 bytes, the stack buffer is
+/// spilled into the builder's in-progress block and subsequent writes go
+/// directly there.
+pub(crate) struct StringViewWriter<'a> {
Review Comment:
This is definitely something we could consider porting up stream to arrow-rs
🤔
##########
datafusion/functions/src/strings.rs:
##########
@@ -673,18 +911,123 @@ impl StringViewArrayBuilder {
}
}
+/// [`StringWriter`] for [`StringViewArrayBuilder`].
+///
+/// The writer accumulates the first up-to-12 bytes of a row in a stack
+/// buffer; if the row stays inline-sized, it never touches the data block.
+/// On the first write that would exceed 12 bytes, the stack buffer is
+/// spilled into the builder's in-progress block and subsequent writes go
+/// directly there.
+pub(crate) struct StringViewWriter<'a> {
+ inline_buf: [u8; 12],
+ inline_len: u8,
+ /// `None` while the row fits inline; becomes `Some(start)` (offset of
+ /// the row's first byte in `in_progress`) at first spill.
+ spill_cursor: Option<usize>,
+ builder: &'a mut StringViewArrayBuilder,
+}
+
+impl StringWriter for StringViewWriter<'_> {
+ #[inline]
+ fn write_str(&mut self, s: &str) {
+ let bytes = s.as_bytes();
+ match self.spill_cursor {
+ None => {
+ let inline_len = self.inline_len as usize;
+ let new_len = inline_len + bytes.len();
+ if new_len <= 12 {
+
self.inline_buf[inline_len..new_len].copy_from_slice(bytes);
+ self.inline_len = new_len as u8;
+ } else {
+ // First spill of this row: reserve capacity (which may
+ // flush the current block — safe, no row-data is in it
+ // yet for this row), copy the buffered prefix, then
+ // write the new bytes.
+ self.builder.ensure_long_capacity(new_len as u32);
+ let cursor = self.builder.in_progress.len();
+ self.builder
+ .in_progress
+ .extend_from_slice(&self.inline_buf[..inline_len]);
+ self.builder.in_progress.extend_from_slice(bytes);
+ self.spill_cursor = Some(cursor);
+ }
+ }
+ Some(_) => {
+ self.builder.in_progress.extend_from_slice(bytes);
+ }
+ }
+ }
+
+ #[inline]
+ fn write_char(&mut self, c: char) {
+ let len = c.len_utf8();
+ match self.spill_cursor {
+ None => {
+ let inline_len = self.inline_len as usize;
+ let new_len = inline_len + len;
+ if new_len <= 12 {
+ c.encode_utf8(&mut self.inline_buf[inline_len..new_len]);
+ self.inline_len = new_len as u8;
+ } else {
+ self.builder.ensure_long_capacity(new_len as u32);
+ let cursor = self.builder.in_progress.len();
+ self.builder
+ .in_progress
+ .extend_from_slice(&self.inline_buf[..inline_len]);
+ push_char_to_vec(&mut self.builder.in_progress, c);
+ self.spill_cursor = Some(cursor);
+ }
+ }
+ Some(_) => {
+ push_char_to_vec(&mut self.builder.in_progress, c);
+ }
+ }
+ }
+}
+
+#[inline]
+fn push_char_to_vec(v: &mut Vec<u8>, c: char) {
Review Comment:
Do you evaluate a more safe version like
```rust
fn push_char_to_vec(v: &mut Vec<u8>, c: char) {
let mut buf = [0u8; 4];
v.extend_from_slice(c.encode_utf8(&mut buf).as_bytes());
}
```
In theory that is another copy, but LLVM might be smart enough to inline /
elide the copy
Maybe some justification about why this was needed would help
##########
datafusion/functions/src/strings.rs:
##########
@@ -627,6 +733,138 @@ impl StringViewArrayBuilder {
self.placeholder_count += 1;
}
+ /// Ensure the in-progress block has room for `length` more bytes,
+ /// flushing the current block and starting a new (doubled) one if not.
+ /// Caller must only invoke this between rows — flushing mid-row would
+ /// orphan partial row data.
+ #[inline]
+ fn ensure_long_capacity(&mut self, length: u32) {
+ let required_cap = self.in_progress.len() + length as usize;
+ if self.in_progress.capacity() < required_cap {
+ self.flush_in_progress();
+ let to_reserve = (length as usize).max(self.next_block_size() as
usize);
+ self.in_progress.reserve(to_reserve);
+ }
+ }
+
+ /// Encode a long-form view referencing `length` bytes already written
+ /// into the in-progress block at `offset`. `prefix_bytes` is the row's
+ /// data slice (or any slice starting with the row's first 4 bytes).
+ ///
+ /// Built inline rather than going through `make_view`, which is
+ /// `[inline(never)]`.
+ #[inline]
+ fn make_long_view(&self, length: u32, offset: u32, prefix_bytes: &[u8]) ->
u128 {
+ let buffer_index: u32 = i32::try_from(self.completed.len())
+ .expect("buffer count exceeds i32::MAX")
+ as u32;
+ ByteView {
+ length,
+ // length > 12, so prefix_bytes has at least 4 bytes.
+ prefix: u32::from_le_bytes(prefix_bytes[..4].try_into().unwrap()),
+ buffer_index,
+ offset,
+ }
+ .into()
+ }
+
+ /// Append a row whose bytes are produced by mapping each byte of `src`
+ /// through `map`, in order. Output length equals `src.len()`.
+ ///
+ /// Because output length is known up front, this is more efficient than
+ /// `append_with` when computing a byte-to-byte mapping.
+ ///
+ /// # Safety
+ ///
+ /// The bytes produced by `map` over `src.iter()`, in order, must form
+ /// valid UTF-8.
+ ///
+ /// # Panics
+ ///
+ /// Panics under the same conditions as [`Self::append_value`]: if
+ /// `src.len()`, the in-progress buffer offset, or the number of completed
+ /// buffers exceeds `i32::MAX`.
+ #[inline]
+ pub unsafe fn append_byte_map<F: FnMut(u8) -> u8>(&mut self, src: &[u8],
mut map: F) {
+ let length: u32 =
+ i32::try_from(src.len()).expect("value length exceeds i32::MAX")
as u32;
+ if length <= 12 {
Review Comment:
this looks similar to
https://docs.rs/arrow/latest/arrow/array/fn.make_view.html - I wonder if we
could use that instead of the replicatin
##########
datafusion/functions/src/strings.rs:
##########
@@ -538,6 +584,79 @@ pub(crate) const STRING_VIEW_INIT_BLOCK_SIZE: u32 = 8 *
1024;
/// grows to; matches Arrow's `GenericByteViewBuilder` default.
pub(crate) const STRING_VIEW_MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024;
+/// Append-only writer handed to closures passed to `append_with`.
+pub(crate) trait StringWriter {
+ fn write_str(&mut self, s: &str);
+ fn write_char(&mut self, c: char);
+}
+
+/// [`StringWriter`] for [`GenericStringArrayBuilder`]. Writes go straight to
+/// the value buffer.
+pub(crate) struct GenericStringWriter<'a> {
+ value_buffer: &'a mut MutableBuffer,
+}
+
+impl StringWriter for GenericStringWriter<'_> {
+ #[inline(always)]
+ fn write_str(&mut self, s: &str) {
+ push_bytes_to_mutable_buffer(self.value_buffer, s.as_bytes());
+ }
+
+ #[inline(always)]
+ fn write_char(&mut self, c: char) {
+ push_char_to_mutable_buffer(self.value_buffer, c);
+ }
+}
+
+/// Write `bytes` into `value_buffer`. For repeated small writes,
+/// MutableBuffer::extend_from_slice can be slow (memcpy per call), so we
extend
+/// the buffer here directly and force inlining.
+#[inline(always)]
+fn push_bytes_to_mutable_buffer(value_buffer: &mut MutableBuffer, bytes:
&[u8]) {
+ let n = bytes.len();
+ let old_len = value_buffer.len();
+ value_buffer.reserve(n);
+
+ // SAFETY: we reserved `n` bytes; the source and destination do not alias
+ // because `bytes` was passed in by the caller and `value_buffer` is owned.
+ unsafe {
+ let dst = value_buffer.as_mut_ptr().add(old_len);
+ let src = bytes.as_ptr();
+ match n {
+ 0 => {}
+ 1 => std::ptr::copy_nonoverlapping(src, dst, 1),
+ 2 => std::ptr::copy_nonoverlapping(src, dst, 2),
+ 3 => std::ptr::copy_nonoverlapping(src, dst, 3),
+ 4 => std::ptr::copy_nonoverlapping(src, dst, 4),
+ 5 => std::ptr::copy_nonoverlapping(src, dst, 5),
+ 6 => std::ptr::copy_nonoverlapping(src, dst, 6),
+ 7 => std::ptr::copy_nonoverlapping(src, dst, 7),
+ 8 => std::ptr::copy_nonoverlapping(src, dst, 8),
+ _ => std::ptr::copy_nonoverlapping(src, dst, n),
+ }
+ value_buffer.set_len(old_len + n);
+ }
+}
+
+#[inline(always)]
+fn push_char_to_mutable_buffer(value_buffer: &mut MutableBuffer, c: char) {
+ let len = c.len_utf8();
+ let old_len = value_buffer.len();
+ value_buffer.reserve(len);
+
+ // SAFETY: we reserved `len` bytes above, write valid UTF-8 into those
Review Comment:
same question as ablow -- can we avoid the unsafe by copying to a local
stack variable first?
##########
datafusion/functions/src/strings.rs:
##########
@@ -627,6 +733,138 @@ impl StringViewArrayBuilder {
self.placeholder_count += 1;
}
+ /// Ensure the in-progress block has room for `length` more bytes,
+ /// flushing the current block and starting a new (doubled) one if not.
+ /// Caller must only invoke this between rows — flushing mid-row would
+ /// orphan partial row data.
+ #[inline]
+ fn ensure_long_capacity(&mut self, length: u32) {
+ let required_cap = self.in_progress.len() + length as usize;
+ if self.in_progress.capacity() < required_cap {
+ self.flush_in_progress();
+ let to_reserve = (length as usize).max(self.next_block_size() as
usize);
+ self.in_progress.reserve(to_reserve);
+ }
+ }
+
+ /// Encode a long-form view referencing `length` bytes already written
+ /// into the in-progress block at `offset`. `prefix_bytes` is the row's
+ /// data slice (or any slice starting with the row's first 4 bytes).
+ ///
+ /// Built inline rather than going through `make_view`, which is
+ /// `[inline(never)]`.
+ #[inline]
+ fn make_long_view(&self, length: u32, offset: u32, prefix_bytes: &[u8]) ->
u128 {
+ let buffer_index: u32 = i32::try_from(self.completed.len())
+ .expect("buffer count exceeds i32::MAX")
+ as u32;
+ ByteView {
+ length,
+ // length > 12, so prefix_bytes has at least 4 bytes.
+ prefix: u32::from_le_bytes(prefix_bytes[..4].try_into().unwrap()),
+ buffer_index,
+ offset,
+ }
+ .into()
+ }
+
+ /// Append a row whose bytes are produced by mapping each byte of `src`
+ /// through `map`, in order. Output length equals `src.len()`.
+ ///
+ /// Because output length is known up front, this is more efficient than
+ /// `append_with` when computing a byte-to-byte mapping.
+ ///
+ /// # Safety
+ ///
+ /// The bytes produced by `map` over `src.iter()`, in order, must form
+ /// valid UTF-8.
+ ///
+ /// # Panics
+ ///
+ /// Panics under the same conditions as [`Self::append_value`]: if
+ /// `src.len()`, the in-progress buffer offset, or the number of completed
+ /// buffers exceeds `i32::MAX`.
+ #[inline]
+ pub unsafe fn append_byte_map<F: FnMut(u8) -> u8>(&mut self, src: &[u8],
mut map: F) {
+ let length: u32 =
+ i32::try_from(src.len()).expect("value length exceeds i32::MAX")
as u32;
+ if length <= 12 {
+ // `iter_mut().zip()` over a stack array is the canonical autovec
+ // pattern.
+ let mut bytes = [0u8; 12];
+ for (d, &b) in bytes[..src.len()].iter_mut().zip(src) {
+ *d = map(b);
+ }
+ self.views.push(make_view(&bytes[..src.len()], 0, 0));
+ return;
+ }
+
+ self.ensure_long_capacity(length);
+
+ let cursor = self.in_progress.len();
+ let offset: u32 = i32::try_from(cursor).expect("offset exceeds
i32::MAX") as u32;
+ self.in_progress.extend(src.iter().map(|&b| map(b)));
+ self.views
+ .push(self.make_long_view(length, offset,
&self.in_progress[cursor..]));
+ }
+
+ /// Append a row whose bytes are produced by `f` calling write methods on
+ /// the supplied [`StringWriter`].
+ ///
+ /// Initial writes accumulate in a 12-byte stack buffer. If the row stays
Review Comment:
this feels like documentation that should go on StringViewWriter (they sort
of already are)
##########
datafusion/functions/src/strings.rs:
##########
@@ -673,18 +911,123 @@ impl StringViewArrayBuilder {
}
}
+/// [`StringWriter`] for [`StringViewArrayBuilder`].
+///
+/// The writer accumulates the first up-to-12 bytes of a row in a stack
+/// buffer; if the row stays inline-sized, it never touches the data block.
+/// On the first write that would exceed 12 bytes, the stack buffer is
+/// spilled into the builder's in-progress block and subsequent writes go
+/// directly there.
+pub(crate) struct StringViewWriter<'a> {
+ inline_buf: [u8; 12],
+ inline_len: u8,
+ /// `None` while the row fits inline; becomes `Some(start)` (offset of
+ /// the row's first byte in `in_progress`) at first spill.
+ spill_cursor: Option<usize>,
+ builder: &'a mut StringViewArrayBuilder,
+}
+
+impl StringWriter for StringViewWriter<'_> {
+ #[inline]
+ fn write_str(&mut self, s: &str) {
+ let bytes = s.as_bytes();
+ match self.spill_cursor {
+ None => {
+ let inline_len = self.inline_len as usize;
+ let new_len = inline_len + bytes.len();
+ if new_len <= 12 {
+
self.inline_buf[inline_len..new_len].copy_from_slice(bytes);
+ self.inline_len = new_len as u8;
+ } else {
+ // First spill of this row: reserve capacity (which may
+ // flush the current block — safe, no row-data is in it
+ // yet for this row), copy the buffered prefix, then
+ // write the new bytes.
+ self.builder.ensure_long_capacity(new_len as u32);
+ let cursor = self.builder.in_progress.len();
+ self.builder
+ .in_progress
+ .extend_from_slice(&self.inline_buf[..inline_len]);
+ self.builder.in_progress.extend_from_slice(bytes);
+ self.spill_cursor = Some(cursor);
+ }
+ }
+ Some(_) => {
+ self.builder.in_progress.extend_from_slice(bytes);
+ }
+ }
+ }
+
+ #[inline]
+ fn write_char(&mut self, c: char) {
+ let len = c.len_utf8();
+ match self.spill_cursor {
+ None => {
+ let inline_len = self.inline_len as usize;
+ let new_len = inline_len + len;
+ if new_len <= 12 {
+ c.encode_utf8(&mut self.inline_buf[inline_len..new_len]);
+ self.inline_len = new_len as u8;
+ } else {
+ self.builder.ensure_long_capacity(new_len as u32);
+ let cursor = self.builder.in_progress.len();
+ self.builder
+ .in_progress
+ .extend_from_slice(&self.inline_buf[..inline_len]);
+ push_char_to_vec(&mut self.builder.in_progress, c);
+ self.spill_cursor = Some(cursor);
+ }
+ }
+ Some(_) => {
+ push_char_to_vec(&mut self.builder.in_progress, c);
+ }
+ }
+ }
+}
+
+#[inline]
+fn push_char_to_vec(v: &mut Vec<u8>, c: char) {
+ let len = c.len_utf8();
+ let old_len = v.len();
+ v.reserve(len);
+ // SAFETY: we reserved `len` bytes above, write valid UTF-8 into those
+ // bytes, then update the initialized length to include them.
+ unsafe {
+ let dst = v.as_mut_ptr().add(old_len);
+ if len == 1 {
+ *dst = c as u8;
+ } else {
+ c.encode_utf8(std::slice::from_raw_parts_mut(dst, len));
+ }
+ v.set_len(old_len + len);
+ }
+}
+
/// Trait abstracting over the bulk-NULL string array builders.
///
/// Similar to Arrow's `StringLikeArrayBuilder`, this allows generic dispatch
/// over the three string array types (Utf8, LargeUtf8, Utf8View) when the
/// function body is uniform across them.
pub(crate) trait BulkNullStringArrayBuilder {
+ /// Per-builder concrete writer type, exposed as a GAT so generic callers
+ /// can use the inherent (non-`dyn`) writer methods without vtable
+ /// dispatch.
+ type Writer<'a>: StringWriter
+ where
+ Self: 'a;
+
fn append_value(&mut self, value: &str);
fn append_placeholder(&mut self);
+ fn append_with<F>(&mut self, f: F)
+ where
+ F: for<'a> FnOnce(&mut Self::Writer<'a>);
+ unsafe fn append_byte_map<F: FnMut(u8) -> u8>(&mut self, src: &[u8], map:
F);
Review Comment:
I also think documenting carefully what is needed to use this `unsafe` API
safely is important (namely that the output bytes for valid UTF8 I think)
##########
datafusion/functions/src/strings.rs:
##########
@@ -493,6 +493,52 @@ impl<O: OffsetSizeTrait> GenericStringArrayBuilder<O> {
self.placeholder_count += 1;
}
+ /// Append a row whose bytes are produced by mapping each byte of `src`
Review Comment:
see comments below -- I think it would be good to consolidate the
documentation on the BulkNullStringArrayBuilder trait and leave a pointer here
-- that way the documentation won't drift.
Replication is ok too, but I think it would be better to avoid if possible
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]