Prajwal-banakar commented on code in PR #411:
URL: https://github.com/apache/fluss-rust/pull/411#discussion_r2922326475
##########
crates/fluss/src/client/table/lookup.rs:
##########
@@ -59,32 +61,80 @@ impl LookupResult {
/// `CompactedRow` borrows from this result set and cannot outlive it.
///
/// # Returns
- /// - `Ok(Some(row))`: If exactly one row exists.
- /// - `Ok(None)`: If the result set is empty.
- /// - `Err(Error::UnexpectedError)`: If the result set contains more than
one row.
+ /// - `Ok(rows)` - All rows in the result set.
+ /// - `Err(Error)` - If any row payload is too short to contain a schema
id.
///
pub fn get_single_row(&self) -> Result<Option<CompactedRow<'_>>> {
match self.rows.len() {
0 => Ok(None),
- 1 => Ok(Some(CompactedRow::from_bytes(
- &self.row_type,
- &self.rows[0][SCHEMA_ID_LENGTH..],
- ))),
+ 1 => {
+ let payload =
+ self.rows[0]
+ .get(SCHEMA_ID_LENGTH..)
+ .ok_or_else(|| Error::UnexpectedError {
+ message: format!(
+ "Row payload too short: {} bytes, need at
least {} for schema id",
+ self.rows[0].len(),
+ SCHEMA_ID_LENGTH
+ ),
+ source: None,
+ })?;
+ Ok(Some(CompactedRow::from_bytes(&self.row_type, payload)))
+ }
_ => Err(Error::UnexpectedError {
message: "LookupResult contains multiple rows, use get_rows()
instead".to_string(),
source: None,
}),
}
}
- /// Returns all rows as CompactedRows.
- pub fn get_rows(&self) -> Vec<CompactedRow<'_>> {
+ pub fn get_rows(&self) -> Result<Vec<CompactedRow<'_>>> {
self.rows
.iter()
- // TODO Add schema id check and fetch when implementing prefix
lookup
- .map(|bytes| CompactedRow::from_bytes(&self.row_type,
&bytes[SCHEMA_ID_LENGTH..]))
+ .map(|bytes| {
+ let payload =
+ bytes
+ .get(SCHEMA_ID_LENGTH..)
+ .ok_or_else(|| Error::UnexpectedError {
+ message: format!(
+ "Row payload too short: {} bytes, need at
least {} for schema id",
+ bytes.len(),
+ SCHEMA_ID_LENGTH
+ ),
+ source: None,
+ })?;
+ Ok(CompactedRow::from_bytes(&self.row_type, payload))
+ })
.collect()
}
+ /// Converts all rows in this result into an Arrow [`RecordBatch`].
+ ///
+ /// This is useful for integration with DataFusion or other Arrow-based
tools.
+ ///
+ /// # Returns
+ /// - `Ok(RecordBatch)` - All rows in columnar Arrow format. Returns an
empty
+ /// batch (with the correct schema) if the result set is empty.
+ /// - `Err(Error)` - If the conversion fails.
+ pub fn to_record_batch(&self) -> Result<RecordBatch> {
+ let mut builder = RowAppendRecordBatchBuilder::new(&self.row_type)?;
+
+ for bytes in &self.rows {
+ let payload = bytes.get(SCHEMA_ID_LENGTH..).ok_or_else(|| {
+ Error::RowConvertError {
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]