atharvalade commented on code in PR #2886:
URL: https://github.com/apache/iggy/pull/2886#discussion_r2900579334


##########
core/connectors/sinks/clickhouse_sink/src/binary.rs:
##########
@@ -0,0 +1,1095 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! RowBinary / RowBinaryWithDefaults byte serialization.
+//!
+//! Follows the ClickHouse binary format specification:
+//! <https://clickhouse.com/docs/en/interfaces/formats#rowbinary>
+//!
+//! Key layout rules:
+//! - All integers are **little-endian**.
+//! - Strings are prefixed with an **unsigned LEB128 varint** length.
+//! - `Nullable(T)`: 1-byte null marker (`0x01` = null, `0x00` = not null)
+//!   followed by T bytes when not null.
+//! - `RowBinaryWithDefaults`: each top-level column is preceded by a 1-byte
+//!   flag (`0x01` = use server DEFAULT, `0x00` = value follows).
+
+use crate::schema::{ChType, Column};
+use iggy_connector_sdk::Error;
+use simd_json::OwnedValue;
+use simd_json::prelude::{TypedScalarValue, ValueAsArray, ValueAsObject};
+use tracing::error;
+
+// ─── Public API 
──────────────────────────────────────────────────────────────
+
+/// Serialise one message (a JSON object) as a RowBinaryWithDefaults row.
+///
+/// Columns are written in schema order. When a column is absent from the JSON
+/// object and `has_default` is true the DEFAULT prefix byte (`0x01`) is
+/// written and the column value is skipped. When a column is absent but has no
+/// default and is not Nullable this is an error.
+pub fn serialize_row(
+    value: &OwnedValue,
+    columns: &[Column],
+    buf: &mut Vec<u8>,
+) -> Result<(), Error> {
+    let obj = value.as_object().ok_or_else(|| {
+        error!("RowBinary: message payload is not a JSON object");
+        Error::InvalidRecord
+    })?;
+
+    for col in columns {
+        let field_value = obj.get(col.name.as_str());
+
+        // RowBinaryWithDefaults prefix byte
+        let is_null_or_absent = field_value.map(|v| 
v.is_null()).unwrap_or(true);
+        if is_null_or_absent && col.has_default {
+            buf.push(0x01); // use DEFAULT
+            continue;
+        }
+        buf.push(0x00); // value follows
+
+        match field_value {
+            Some(v) if !v.is_null() => serialize_value(v, &col.ch_type, buf)?,
+            _ => {
+                // Field is absent or null — write zero value if Nullable,
+                // otherwise error.
+                write_zero_or_null(&col.ch_type, buf, &col.name)?;
+            }
+        }
+    }
+    Ok(())
+}
+
+// ─── Core recursive serializer 
────────────────────────────────────────────────
+
+pub(crate) fn serialize_value(
+    value: &OwnedValue,
+    ch_type: &ChType,
+    buf: &mut Vec<u8>,
+) -> Result<(), Error> {
+    match ch_type {
+        // ── Nullable 
─────────────────────────────────────────────────────────
+        ChType::Nullable(inner) => {
+            if value.is_null() {
+                buf.push(0x01); // null
+            } else {
+                buf.push(0x00); // not null
+                serialize_value(value, inner, buf)?;
+            }
+        }
+
+        // ── String 
───────────────────────────────────────────────────────────
+        ChType::String => {
+            let s = coerce_to_string(value)?;
+            write_string(s.as_bytes(), buf);
+        }
+        ChType::FixedString(n) => {
+            let s = coerce_to_string(value)?;
+            let bytes = s.as_bytes();
+            // Pad or truncate to exactly n bytes
+            let mut fixed = vec![0u8; *n];
+            let copy_len = bytes.len().min(*n);
+            fixed[..copy_len].copy_from_slice(&bytes[..copy_len]);
+            buf.extend_from_slice(&fixed);
+        }
+
+        // ── Integers 
─────────────────────────────────────────────────────────
+        ChType::Int8 => buf.push(coerce_i64(value)? as i8 as u8),
+        ChType::Int16 => buf.extend_from_slice(&(coerce_i64(value)? as 
i16).to_le_bytes()),
+        ChType::Int32 => buf.extend_from_slice(&(coerce_i64(value)? as 
i32).to_le_bytes()),
+        ChType::Int64 => 
buf.extend_from_slice(&coerce_i64(value)?.to_le_bytes()),
+        ChType::UInt8 => buf.push(coerce_u64(value)? as u8),
+        ChType::UInt16 => buf.extend_from_slice(&(coerce_u64(value)? as 
u16).to_le_bytes()),
+        ChType::UInt32 => buf.extend_from_slice(&(coerce_u64(value)? as 
u32).to_le_bytes()),
+        ChType::UInt64 => 
buf.extend_from_slice(&coerce_u64(value)?.to_le_bytes()),
+
+        // ── Floats 
───────────────────────────────────────────────────────────
+        ChType::Float32 => {
+            let f = coerce_f64(value)? as f32;
+            buf.extend_from_slice(&f.to_le_bytes());
+        }
+        ChType::Float64 => {
+            let f = coerce_f64(value)?;
+            buf.extend_from_slice(&f.to_le_bytes());
+        }
+
+        // ── Boolean 
──────────────────────────────────────────────────────────
+        ChType::Boolean => {
+            let b = match value {
+                OwnedValue::Static(simd_json::StaticNode::Bool(b)) => *b,
+                OwnedValue::Static(simd_json::StaticNode::I64(n)) => *n != 0,
+                OwnedValue::Static(simd_json::StaticNode::U64(n)) => *n != 0,
+                _ => {
+                    error!("Cannot convert to Boolean: {value:?}");
+                    return Err(Error::InvalidRecord);
+                }
+            };
+            buf.push(b as u8);
+        }
+
+        // ── UUID 
─────────────────────────────────────────────────────────────
+        // ClickHouse stores UUID as two little-endian 64-bit words.
+        // Input: standard hyphenated UUID string 
"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
+        ChType::Uuid => {
+            let s = coerce_to_string(value)?;
+            let hex: String = s.chars().filter(|c| 
c.is_ascii_hexdigit()).collect();
+            if hex.len() != 32 {
+                error!("Invalid UUID string: {s}");
+                return Err(Error::InvalidRecord);
+            }
+            let bytes = hex::decode(&hex).map_err(|_| {
+                error!("Cannot decode UUID hex: {hex}");
+                Error::InvalidRecord
+            })?;
+            // ClickHouse UUID layout: first 8 bytes reversed, second 8 bytes 
reversed
+            let mut uuid_buf = [0u8; 16];
+            uuid_buf[..8].copy_from_slice(&bytes[..8]);
+            uuid_buf[8..].copy_from_slice(&bytes[8..]);
+            uuid_buf[..8].reverse();
+            uuid_buf[8..].reverse();
+            buf.extend_from_slice(&uuid_buf);
+        }
+
+        // ── Date types 
───────────────────────────────────────────────────────
+        ChType::Date => {
+            // Days since 1970-01-01 as UInt16. Accept integer or "YYYY-MM-DD".
+            let days = coerce_to_days(value)? as u16;
+            buf.extend_from_slice(&days.to_le_bytes());
+        }
+        ChType::Date32 => {
+            let days = coerce_to_days(value)? as i32;
+            buf.extend_from_slice(&days.to_le_bytes());
+        }
+        ChType::DateTime => {
+            // Unix seconds as UInt32. Accept integer or RFC 3339 string.
+            let secs = coerce_to_unix_seconds(value)? as u32;
+            buf.extend_from_slice(&secs.to_le_bytes());
+        }
+        ChType::DateTime64(precision) => {
+            // Unix time scaled by 10^precision as Int64.
+            let secs_f64 = coerce_to_unix_seconds_f64(value)?;
+            let scale = 10i64.pow(*precision as u32) as f64;
+            let scaled = (secs_f64 * scale).round() as i64;
+            buf.extend_from_slice(&scaled.to_le_bytes());
+        }
+
+        // ── Decimal 
──────────────────────────────────────────────────────────
+        ChType::Decimal(precision, scale) => {
+            let f = coerce_f64(value)?;
+            let scale_factor = 10f64.powi(*scale as i32);
+            let int_val = (f * scale_factor).round() as i128;
+            if *precision <= 9 {
+                buf.extend_from_slice(&(int_val as i32).to_le_bytes());
+            } else if *precision <= 18 {
+                buf.extend_from_slice(&(int_val as i64).to_le_bytes());
+            } else {
+                // Int128: two little-endian 64-bit words, low word first
+                let lo = int_val as i64;
+                let hi = (int_val >> 64) as i64;
+                buf.extend_from_slice(&lo.to_le_bytes());
+                buf.extend_from_slice(&hi.to_le_bytes());
+            }
+        }

Review Comment:
   The as `i32` / as `i64` casts on `int_val` are truncating in Rust. if the 
incoming value exceeds the column's declared precision, the lower bits silently 
wrap around and you'll write wrong data into ClickHouse with no error. 
ClickHouse won't reject it since RowBinary is trusted input. Could you add 
bounds checks before the casts and return `InvalidRecord` if the scaled value 
doesn't fit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to