leekeiabstraction commented on code in PR #452: URL: https://github.com/apache/fluss-rust/pull/452#discussion_r3037172619
########## bindings/elixir/test/integration/log_table_test.exs: ########## @@ -0,0 +1,407 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.Integration.LogTableTest do + use ExUnit.Case, async: false + + alias Fluss.Test.Cluster + + @moduletag :integration + + @database "fluss" + + setup_all do + case Cluster.ensure_started() do + {:ok, servers} -> + config = Fluss.Config.new(servers) + + # Wait for cluster to be fully ready (connection + admin working) + {conn, admin} = connect_with_retry(config, 90) + + %{conn: conn, admin: admin, config: config} + + {:error, reason} -> + raise "Failed to start Fluss cluster: #{reason}" + end + end + + describe "append and scan" do + test "append rows and scan with log scanner", %{conn: conn, admin: admin} do + table_name = "ex_test_append_and_scan_#{:rand.uniform(100_000)}" + cleanup_table(admin, table_name) + + schema = + Fluss.Schema.build() + |> Fluss.Schema.column("c1", :int) + |> Fluss.Schema.column("c2", :string) + |> Fluss.Schema.build!() + + descriptor = Fluss.TableDescriptor.new!(schema) + :ok = Fluss.Admin.create_table(admin, @database, table_name, descriptor, false) + + table = Fluss.Table.get!(conn, @database, table_name) + writer = Fluss.AppendWriter.new!(table) + + # Append 6 rows + for {c1, c2} <- [{1, "a1"}, {2, "a2"}, {3, "a3"}, {4, "a4"}, {5, "a5"}, {6, "a6"}] do + {:ok, _} = Fluss.AppendWriter.append(writer, [c1, c2]) + end + + :ok = Fluss.AppendWriter.flush(writer) + + # Scan all records + scanner = Fluss.LogScanner.new!(table) + :ok = Fluss.LogScanner.subscribe(scanner, 0, Fluss.earliest_offset()) + + records = poll_records(scanner, 6) + + assert length(records) == 6 + + sorted = Enum.sort_by(records, fn r -> r[:row][:c1] end) + + for {record, i} <- Enum.with_index(sorted, 1) do + assert record[:row][:c1] == i + assert record[:row][:c2] == "a#{i}" + assert record[:change_type] == :append_only + end + + # Unsubscribe should not error + :ok = Fluss.LogScanner.unsubscribe(scanner, 0) + + cleanup_table(admin, table_name) + end + + test "append with nil values", %{conn: conn, admin: admin} do + table_name = "ex_test_append_nil_#{:rand.uniform(100_000)}" + cleanup_table(admin, table_name) + + schema = + Fluss.Schema.build() + |> Fluss.Schema.column("id", :int) + |> Fluss.Schema.column("name", :string) + |> Fluss.Schema.build!() + + descriptor = Fluss.TableDescriptor.new!(schema) + :ok = Fluss.Admin.create_table(admin, @database, table_name, descriptor, false) + + table = Fluss.Table.get!(conn, @database, table_name) + writer = Fluss.AppendWriter.new!(table) + + {:ok, _} = Fluss.AppendWriter.append(writer, [1, nil]) + {:ok, _} = Fluss.AppendWriter.append(writer, [2, "present"]) + :ok = Fluss.AppendWriter.flush(writer) + + scanner = Fluss.LogScanner.new!(table) + :ok = Fluss.LogScanner.subscribe(scanner, 0, Fluss.earliest_offset()) + + records = poll_records(scanner, 2) + assert length(records) == 2 + + sorted = Enum.sort_by(records, fn r -> r[:row][:id] end) + assert Enum.at(sorted, 0)[:row][:name] == nil + assert Enum.at(sorted, 1)[:row][:name] == "present" + + cleanup_table(admin, table_name) + end + end + + describe "multiple data types" do + test "int, bigint, float, double, string, boolean", %{conn: conn, admin: admin} do Review Comment: Are these datatypes exhaustive? Should we also test tinting and smallint? ########## bindings/elixir/native/fluss_nif/src/log_scanner.rs: ########## @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::atoms::{self, to_nif_err}; +use crate::row_convert; +use crate::table::TableResource; +use crate::RUNTIME; +use fluss::client::LogScanner; +use fluss::metadata::Column; +use fluss::record::ChangeType; +use rustler::env::OwnedEnv; +use rustler::types::LocalPid; +use rustler::{Atom, Encoder, Env, ResourceArc}; +use std::collections::HashMap; +use std::time::Duration; + +pub struct LogScannerResource { + pub scanner: LogScanner, + pub columns: Vec<Column>, +} + +impl std::panic::RefUnwindSafe for LogScannerResource {} + +#[rustler::resource_impl] +impl rustler::Resource for LogScannerResource {} + +#[rustler::nif] +fn log_scanner_new( + table: ResourceArc<TableResource>, +) -> Result<ResourceArc<LogScannerResource>, rustler::Error> { + let _guard = RUNTIME.enter(); + let (scanner, columns) = table.with_table(|t| { + let scanner = t + .new_scan() + .create_log_scanner() + .map_err(to_nif_err)?; + Ok((scanner, t.get_table_info().schema.columns().to_vec())) + })?; + Ok(ResourceArc::new(LogScannerResource { scanner, columns })) +} + +#[rustler::nif(schedule = "DirtyIo")] +fn log_scanner_subscribe( + scanner: ResourceArc<LogScannerResource>, + bucket: i32, + offset: i64, +) -> Result<Atom, rustler::Error> { + RUNTIME + .block_on(scanner.scanner.subscribe(bucket, offset)) + .map_err(to_nif_err)?; + Ok(atoms::ok()) +} + +#[rustler::nif(schedule = "DirtyIo")] +fn log_scanner_subscribe_buckets( + scanner: ResourceArc<LogScannerResource>, + bucket_offsets: Vec<(i32, i64)>, +) -> Result<Atom, rustler::Error> { + let map: HashMap<i32, i64> = bucket_offsets.into_iter().collect(); + RUNTIME + .block_on(scanner.scanner.subscribe_buckets(&map)) + .map_err(to_nif_err)?; + Ok(atoms::ok()) +} + +#[rustler::nif(schedule = "DirtyIo")] +fn log_scanner_unsubscribe( + scanner: ResourceArc<LogScannerResource>, + bucket: i32, +) -> Result<Atom, rustler::Error> { + RUNTIME + .block_on(scanner.scanner.unsubscribe(bucket)) + .map_err(to_nif_err)?; + Ok(atoms::ok()) +} + +#[rustler::nif] +fn log_scanner_poll( + env: Env, + scanner: ResourceArc<LogScannerResource>, + timeout_ms: u64, +) -> Atom { + let pid = env.pid(); + let scanner = scanner.clone(); + + std::thread::spawn(move || { + let result = RUNTIME.block_on(scanner.scanner.poll(Duration::from_millis(timeout_ms))); + send_poll_result(&pid, result, &scanner.columns); + }); + + atoms::ok() +} + +fn send_poll_result( + pid: &LocalPid, + result: Result<fluss::record::ScanRecords, fluss::error::Error>, + columns: &[Column], +) { + let mut msg_env = OwnedEnv::new(); + + match result { + Ok(scan_records) => { + let _ = msg_env.send_and_clear(pid, |env| { + let records = encode_scan_records(env, scan_records, columns); + (atoms::fluss_records(), records).encode(env) + }); + } + Err(e) => { + let _ = msg_env.send_and_clear(pid, |env| { + (atoms::fluss_poll_error(), e.to_string()).encode(env) + }); + } + } +} + +fn encode_scan_records<'a>( + env: Env<'a>, + scan_records: fluss::record::ScanRecords, + columns: &[Column], +) -> rustler::Term<'a> { + let column_atoms = row_convert::intern_column_atoms(env, columns); + let mut result = Vec::new(); + + for record in scan_records { + let row_map = match row_convert::row_to_term(env, record.row(), columns, &column_atoms) { + Ok(m) => m, + Err(_) => continue, Review Comment: Should we log error here? ########## bindings/elixir/native/fluss_nif/src/log_scanner.rs: ########## @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::atoms::{self, to_nif_err}; +use crate::row_convert; +use crate::table::TableResource; +use crate::RUNTIME; +use fluss::client::LogScanner; +use fluss::metadata::Column; +use fluss::record::ChangeType; +use rustler::env::OwnedEnv; +use rustler::types::LocalPid; +use rustler::{Atom, Encoder, Env, ResourceArc}; +use std::collections::HashMap; +use std::time::Duration; + +pub struct LogScannerResource { + pub scanner: LogScanner, + pub columns: Vec<Column>, +} + +impl std::panic::RefUnwindSafe for LogScannerResource {} + +#[rustler::resource_impl] +impl rustler::Resource for LogScannerResource {} + +#[rustler::nif] +fn log_scanner_new( + table: ResourceArc<TableResource>, +) -> Result<ResourceArc<LogScannerResource>, rustler::Error> { + let _guard = RUNTIME.enter(); + let (scanner, columns) = table.with_table(|t| { + let scanner = t + .new_scan() + .create_log_scanner() + .map_err(to_nif_err)?; + Ok((scanner, t.get_table_info().schema.columns().to_vec())) + })?; + Ok(ResourceArc::new(LogScannerResource { scanner, columns })) +} + +#[rustler::nif(schedule = "DirtyIo")] +fn log_scanner_subscribe( + scanner: ResourceArc<LogScannerResource>, + bucket: i32, + offset: i64, +) -> Result<Atom, rustler::Error> { + RUNTIME + .block_on(scanner.scanner.subscribe(bucket, offset)) + .map_err(to_nif_err)?; + Ok(atoms::ok()) +} + +#[rustler::nif(schedule = "DirtyIo")] +fn log_scanner_subscribe_buckets( + scanner: ResourceArc<LogScannerResource>, + bucket_offsets: Vec<(i32, i64)>, +) -> Result<Atom, rustler::Error> { + let map: HashMap<i32, i64> = bucket_offsets.into_iter().collect(); + RUNTIME + .block_on(scanner.scanner.subscribe_buckets(&map)) + .map_err(to_nif_err)?; + Ok(atoms::ok()) +} + +#[rustler::nif(schedule = "DirtyIo")] +fn log_scanner_unsubscribe( + scanner: ResourceArc<LogScannerResource>, + bucket: i32, +) -> Result<Atom, rustler::Error> { + RUNTIME + .block_on(scanner.scanner.unsubscribe(bucket)) + .map_err(to_nif_err)?; + Ok(atoms::ok()) +} + +#[rustler::nif] +fn log_scanner_poll( + env: Env, + scanner: ResourceArc<LogScannerResource>, + timeout_ms: u64, +) -> Atom { + let pid = env.pid(); + let scanner = scanner.clone(); + + std::thread::spawn(move || { Review Comment: Should we consider using thread pool instead of spawning OS threads? ``` 5. std::thread::spawn in log_scanner_poll (log_scanner.rs:83): Each poll spawns a new OS thread. For a high-frequency consumer calling poll in a loop, this creates thread churn. Consider spawning the blocking work onto the tokio runtime instead: RUNTIME.spawn(async move { ... }); 5. This would reuse the existing thread pool. The PR description mentions avoiding dirty schedulers for streaming — the same argument applies to raw thread::spawn. ``` ########## bindings/elixir/mix.exs: ########## @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.MixProject do + use Mix.Project + + @version "0.1.0" + + def project do + [ + app: :fluss, + version: @version, + elixir: "~> 1.15", + start_permanent: Mix.env() == :prod, + elixirc_paths: elixirc_paths(Mix.env()), + deps: deps(), + description: "Elixir client for Apache Fluss", + package: package() + ] + end + + def application do + [ + extra_applications: [:logger] + ] + end + + defp elixirc_paths(:test), do: ["lib", "test/support"] + defp elixirc_paths(_), do: ["lib"] + + defp deps do Review Comment: Is there automated licence check that we adopt for hex deps? ########## bindings/elixir/native/fluss_nif/Cargo.toml: ########## @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "fluss_nif" +version = "0.1.0" Review Comment: Should be `0.2.0` ########## bindings/elixir/native/fluss_nif/Cargo.toml: ########## @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "fluss_nif" +version = "0.1.0" +edition = "2024" Review Comment: Should be 2026 ########## Cargo.toml: ########## @@ -28,7 +28,7 @@ keywords = ["fluss", "streaming-storage", "datalake"] [workspace] resolver = "2" -members = ["crates/fluss", "crates/examples", "bindings/python", "bindings/cpp"] +members = ["crates/fluss", "crates/examples", "bindings/python", "bindings/cpp", "bindings/elixir/native/fluss_nif"] Review Comment: Should we organise elixir binding structure to be consistent with others? e.g. `bindings/elixir`? ########## bindings/elixir/lib/fluss/table_descriptor.ex: ########## @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.TableDescriptor do + @moduledoc """ + Descriptor for creating a Fluss table. + + Options: `:bucket_count`, `:properties` (list of `{key, value}` string tuples). + + ## Examples + + Fluss.TableDescriptor.new!(schema) + Fluss.TableDescriptor.new!(schema, bucket_count: 3) + + """ + + alias Fluss.Native + + @type t :: reference() + + @spec new!(Fluss.Schema.t(), keyword()) :: t() + def new!(schema, opts \\ []) do + result = + cond do + Keyword.has_key?(opts, :bucket_count) -> Review Comment: Claude review comment: ``` 3. TableDescriptor.new! only applies the first matching option (table_descriptor.ex:49-58): The cond chain means you can't specify both bucket_count and properties simultaneously. Consider supporting combined options or documenting the mutual exclusivity. ``` ########## bindings/elixir/native/fluss_nif/src/row_convert.rs: ########## @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::str::FromStr; + +use fluss::metadata::{Column, DataType}; +use fluss::row::{Date, Decimal, GenericRow, InternalRow, Time, TimestampLtz, TimestampNtz}; +use rustler::types::binary::NewBinary; +use rustler::{Encoder, Env, Term}; + +use crate::atoms; + +pub fn intern_column_atoms<'a>(env: Env<'a>, columns: &[Column]) -> Vec<rustler::Atom> { + columns + .iter() + .map(|col| rustler::Atom::from_str(env, col.name()).expect("valid atom")) + .collect() +} Review Comment: Relying on Claude here for reviews as my knowledge in Elixir / Erlang is very limited. The following comment seems valid, we should at least document this pitfall. ``` 1. Atom.from_str can exhaust the BEAM atom table (row_convert.rs:31): rustler::Atom::from_str(env, col.name()).expect("valid atom") 1. Atoms are never garbage collected. If column names come from user-controlled schemas, repeatedly creating new tables with unique column names could eventually crash the VM (default limit: ~1M atoms). This is a well-known Erlang/Elixir pitfall. Consider documenting this limitation, or converting column names to strings instead of atoms for the row map keys. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
