Copilot commented on code in PR #787: URL: https://github.com/apache/sedona-db/pull/787#discussion_r3146967999
########## rust/sedona-raster-gdal/src/gdal_common.rs: ########## @@ -0,0 +1,591 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use sedona_gdal::dataset::Dataset; +use sedona_gdal::errors::GdalError; +use sedona_gdal::gdal::Gdal; +use sedona_gdal::gdal_dyn_bindgen::{GDAL_OF_RASTER, GDAL_OF_READONLY, GDAL_OF_VERBOSE_ERROR}; +use sedona_gdal::mem::MemDatasetBuilder; +use sedona_gdal::raster::types::DatasetOptions; +use sedona_gdal::raster::types::GdalDataType; + +use sedona_raster::traits::RasterRef; +use sedona_schema::raster::{BandDataType, StorageType}; + +use datafusion_common::{ + arrow_datafusion_err, exec_datafusion_err, exec_err, DataFusionError, Result, +}; + +/// Execute a closure with a reference to the global [`Gdal`] handle, +/// converting initialization errors to [`DataFusionError`]. +pub(crate) fn with_gdal<F, R>(f: F) -> Result<R> +where + F: FnOnce(&Gdal) -> Result<R>, +{ + match sedona_gdal::global::with_global_gdal(f) { + Ok(inner_result) => inner_result, + Err(init_err) => Err(DataFusionError::External(Box::new(init_err))), + } +} + +/// Converts a BandDataType to the corresponding GDAL data type. +pub fn band_data_type_to_gdal(band_type: &BandDataType) -> GdalDataType { + match band_type { + BandDataType::UInt8 => GdalDataType::UInt8, + BandDataType::Int8 => GdalDataType::Int8, + BandDataType::UInt16 => GdalDataType::UInt16, + BandDataType::Int16 => GdalDataType::Int16, + BandDataType::UInt32 => GdalDataType::UInt32, + BandDataType::Int32 => GdalDataType::Int32, + BandDataType::UInt64 => GdalDataType::UInt64, + BandDataType::Int64 => GdalDataType::Int64, + BandDataType::Float32 => GdalDataType::Float32, + BandDataType::Float64 => GdalDataType::Float64, + } +} + +/// Converts a GDAL data type to the corresponding BandDataType. +pub fn gdal_to_band_data_type(gdal_type: GdalDataType) -> Result<BandDataType> { + match gdal_type { + GdalDataType::UInt8 => Ok(BandDataType::UInt8), + GdalDataType::Int8 => Ok(BandDataType::Int8), + GdalDataType::UInt16 => Ok(BandDataType::UInt16), + GdalDataType::Int16 => Ok(BandDataType::Int16), + GdalDataType::UInt32 => Ok(BandDataType::UInt32), + GdalDataType::Int32 => Ok(BandDataType::Int32), + GdalDataType::UInt64 => Ok(BandDataType::UInt64), + GdalDataType::Int64 => Ok(BandDataType::Int64), + GdalDataType::Float32 => Ok(BandDataType::Float32), + GdalDataType::Float64 => Ok(BandDataType::Float64), + _ => Err(DataFusionError::NotImplemented(format!( + "GDAL data type {:?} is not supported", + gdal_type + ))), + } +} + +/// Returns the byte size of a GDAL data type. +pub fn gdal_type_byte_size(gdal_type: GdalDataType) -> usize { + gdal_type.byte_size() +} + +/// Interprets nodata bytes according to the band data type and returns as f64. +/// +/// Returns None if the nodata_bytes is None or has incorrect length. Review Comment: `bytes_to_f64` docstring says it returns `None` on missing/incorrect length, but the function signature returns `Result<f64>` and it returns an error on invalid lengths. Please update the doc comment to match the actual behavior (or change the API if `Option` was intended). ```suggestion /// Interprets nodata bytes according to the band data type and returns the value as `f64`. /// /// Returns an error if `bytes` does not have the expected length for `band_type`. ``` ########## rust/sedona-raster-gdal/src/gdal_dataset_provider.rs: ########## @@ -0,0 +1,786 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::convert::TryInto; +use std::{cell::RefCell, marker::PhantomData, num::NonZeroUsize, rc::Rc}; + +use datafusion_common::config::ConfigOptions; +use datafusion_common::{ + arrow_datafusion_err, exec_datafusion_err, exec_err, DataFusionError, Result, +}; + +use sedona_gdal::dataset::Dataset; +use sedona_gdal::gdal::Gdal; +use sedona_gdal::geo_transform::{GeoTransform, GeoTransformEx}; +use sedona_gdal::raster::types::GdalDataType; + +use sedona_common::SedonaOptions; +use sedona_raster::traits::RasterRef; +use sedona_schema::raster::{BandDataType, StorageType}; + +use crate::gdal_common::{ + band_data_type_to_gdal, bytes_to_f64, convert_gdal_err, normalize_outdb_source_path, + open_gdal_dataset, raster_ref_to_gdal_empty, raster_ref_to_gdal_mem, +}; + +/// A GDAL dataset constructed from a `RasterRef`. +/// +/// This struct is designed to keep any backing GDAL datasets alive for as long as +/// the returned `dataset` might reference them. +/// +/// Field semantics by raster storage layout: +/// +/// 1) **In-db bands only** +/// - `dataset`: a GDAL **MEM** dataset containing all bands. +/// - `gdal_mem_source`: `None` (the MEM dataset is already stored in `dataset`). +/// - `_gdal_outdb_sources`: empty. +/// +/// 2) **Out-db bands only** +/// - `dataset`: a GDAL **VRT** dataset sized like the target raster, with each VRT band +/// sourcing from an external dataset band. +/// - `gdal_mem_source`: `None`. +/// - `_gdal_outdb_sources`: contains the opened external GDAL datasets (kept alive via `Rc`). +/// (There may be duplicates if multiple bands reference the same URL; that is fine.) +/// +/// 3) **Mixed in-db + out-db bands** +/// - `dataset`: a GDAL **VRT** dataset with band order matching the target raster. +/// In-db bands source from a MEM dataset; out-db bands source from external datasets. +/// - `gdal_mem_source`: `Some(MEM dataset)` containing only the in-db bands, in the same order +/// as they appear in the target raster. +/// - `_gdal_outdb_sources`: contains the opened external GDAL datasets (kept alive via `Rc`). +pub(crate) struct RasterDataset<'a> { + /// The dataset to use for further GDAL operations. + dataset: Rc<Dataset>, + /// A MEM dataset holding in-db band data when `dataset` is a VRT that references it. + _gdal_mem_source: Option<Rc<Dataset>>, + /// External datasets referenced by the VRT; kept alive for the lifetime of this struct. + _gdal_outdb_sources: Vec<Rc<Dataset>>, + /// Binds this dataset's lifetime to the borrowed source raster. + _source_raster: PhantomData<&'a dyn RasterRef>, +} + +impl<'a> RasterDataset<'a> { + /// Return a reference to the underlying GDAL dataset. + pub(crate) fn as_dataset(&self) -> &Dataset { + &self.dataset + } +} + +thread_local! { + /// Thread-local lazily-initialized `GDALDatasetCache`. + static TL_GDAL_DATASET_CACHE: RefCell<Option<Rc<GDALDatasetCache>>> = const { RefCell::new(None) }; +} + +const DEFAULT_GDAL_SOURCE_CACHE_SIZE: usize = 32; +const DEFAULT_GDAL_VRT_CACHE_SIZE: usize = 32; + +pub(crate) fn configure_thread_local_options( + gdal: &Gdal, + config_options: Option<&ConfigOptions>, +) -> Result<()> { + // Set frequently requested GDAL config options as thread-local options to eliminate the + // need for acquiring configs from global config or environment variable, which is very + // likely to result in heavy contention in multi-threaded environments. + let cpl_debug_enabled = config_options + .and_then(|c| { + c.extensions + .get::<SedonaOptions>() + .map(|opts| opts.gdal.cpl_debug) + }) + .unwrap_or(false); + let cpl_debug_value = if cpl_debug_enabled { "ON" } else { "OFF" }; + + let thread_local_options = [ + ("CPL_DEBUG", cpl_debug_value), + ("OSR_DEFAULT_AXIS_MAPPING_STRATEGY", "AUTHORITY_COMPLIANT"), + ("GDAL_VALIDATE_CREATION_OPTIONS", "YES"), + ("CHECK_WITH_INVERT_PROJ", "NO"), + ("GDAL_FORCE_CACHING", "NO"), + ("GDAL_ENABLE_READ_WRITE_MUTEX", "YES"), + ]; + + for (key, value) in thread_local_options { + gdal.set_thread_local_config_option(key, value) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + } + Ok(()) +} + +/// Get or create the thread-local `GDALDatasetCache`. +pub(crate) fn thread_local_cache() -> Result<Rc<GDALDatasetCache>> { + TL_GDAL_DATASET_CACHE.with(|cell| { + let mut opt = cell.borrow_mut(); + if let Some(rc) = opt.as_ref() { + Ok(Rc::clone(rc)) + } else { + let cache = Rc::new(GDALDatasetCache::try_new( + DEFAULT_GDAL_SOURCE_CACHE_SIZE, + DEFAULT_GDAL_VRT_CACHE_SIZE, + )?); + *opt = Some(Rc::clone(&cache)); + Ok(cache) + } + }) +} + +/// Build a `GDALDatasetProvider` from an explicit GDAL handle and the thread-local cache. +pub(crate) fn thread_local_provider<'a>(gdal: &'a Gdal) -> Result<GDALDatasetProvider<'a>> { + Ok(GDALDatasetProvider::new(gdal, thread_local_cache()?)) +} + +#[derive(Hash, Eq, PartialEq)] +struct OutDbSourceKey { + path: String, + open_options: Option<Vec<String>>, +} + +impl OutDbSourceKey { + pub fn new(path: &str, options: Option<&[&str]>) -> Self { + let normalized_path = normalize_outdb_source_path(path); + let open_options = options + .filter(|opts| !opts.is_empty()) + .map(|opts| opts.iter().map(|s| (*s).to_string()).collect()); + + Self { + path: normalized_path, + open_options, + } + } +} + +pub(crate) struct GDALDatasetCache { + cached_sources: RefCell<lru::LruCache<OutDbSourceKey, Rc<Dataset>>>, + cached_vrts: RefCell<lru::LruCache<VrtKey, Rc<CachedVrt>>>, +} + +impl GDALDatasetCache { + pub fn try_new(cache_capacity: usize, vrt_cache_capacity: usize) -> Result<Self> { + let Some(cap) = NonZeroUsize::new(cache_capacity) else { + return Err(DataFusionError::Configuration( + "Raster source cache size should be greater than 0".to_string(), + )); + }; + let Some(vrt_cap) = NonZeroUsize::new(vrt_cache_capacity) else { + return Err(DataFusionError::Configuration( + "Raster VRT cache size should be greater than 0".to_string(), + )); + }; + let cache = lru::LruCache::new(cap); + let vrt_cache = lru::LruCache::new(vrt_cap); + Ok(Self { + cached_sources: RefCell::new(cache), + cached_vrts: RefCell::new(vrt_cache), + }) + } + + fn get_or_create_outdb_source( + &self, + gdal: &Gdal, + path: &str, + options: Option<&[&str]>, + ) -> Result<Rc<Dataset>> { + let cache_key = OutDbSourceKey::new(path, options); + let mut cache = self.cached_sources.borrow_mut(); + if let Some(cached_source) = cache.get(&cache_key) { + Ok(Rc::clone(cached_source)) + } else { + let source_dataset = open_gdal_dataset(gdal, path, options)?; + let rc_dataset = Rc::new(source_dataset); + cache.put(cache_key, Rc::clone(&rc_dataset)); + Ok(rc_dataset) + } + } + + fn build_vrt_from_sources<R: RasterRef + ?Sized>( + &self, + gdal: &Gdal, + raster: &R, + gdal_mem_source: Option<&Rc<Dataset>>, + ) -> Result<(Rc<Dataset>, Vec<Rc<Dataset>>)> { + let metadata = raster.metadata(); + let bands = raster.bands(); + let num_bands = bands.len(); + + let width = metadata.width() as i32; + let height = metadata.height() as i32; + let mut vrt = gdal + .create_vrt(metadata.width() as usize, metadata.height() as usize) + .map_err(convert_gdal_err)?; + + let geotransform = [ + metadata.upper_left_x(), + metadata.scale_x(), + metadata.skew_x(), + metadata.upper_left_y(), + metadata.skew_y(), + metadata.scale_y(), + ]; + vrt.set_geo_transform(&geotransform) + .map_err(convert_gdal_err)?; + if let Some(crs) = raster.crs() { + vrt.set_projection(crs).map_err(convert_gdal_err)?; + } + + let mut outdb_sources: Vec<Rc<Dataset>> = Vec::new(); + let mut mem_band_index: usize = 1; + + for i in 1..=num_bands { + let band = bands.band(i).map_err(|e| arrow_datafusion_err!(e))?; + let band_metadata = band.metadata(); + let band_type = band_metadata.data_type()?; + let gdal_type = band_data_type_to_gdal(&band_type); + if matches!(gdal_type, GdalDataType::Unknown) { + return Err(DataFusionError::NotImplemented(format!( + "Band data type {:?} is not supported by this GDAL build", + band_type + ))); + } + + vrt.add_band(gdal_type, None).map_err(convert_gdal_err)?; + let vrt_band = vrt.rasterband(i).map_err(convert_gdal_err)?; + + if let Some(nodata_bytes) = band_metadata.nodata_value() { + match band_type { + BandDataType::UInt64 => { + let nodata_bytes: [u8; 8] = nodata_bytes.try_into().map_err(|_| { + exec_datafusion_err!("Invalid nodata byte length for UInt64") + })?; + let nodata = u64::from_le_bytes(nodata_bytes); + vrt_band + .set_no_data_value_u64(Some(nodata)) + .map_err(convert_gdal_err)?; + } + BandDataType::Int64 => { + let nodata_bytes: [u8; 8] = nodata_bytes.try_into().map_err(|_| { + exec_datafusion_err!("Invalid nodata byte length for Int64") + })?; + let nodata = i64::from_le_bytes(nodata_bytes); + vrt_band + .set_no_data_value_i64(Some(nodata)) + .map_err(convert_gdal_err)?; + } + _ => { + let nodata = bytes_to_f64(nodata_bytes, &band_type)?; + vrt_band + .set_no_data_value(nodata) + .map_err(convert_gdal_err)?; + } + } + } + + match band_metadata.storage_type()? { + StorageType::OutDbRef => { + let url = band_metadata.outdb_url().ok_or_else(|| { + exec_datafusion_err!("Band {} is out-db but missing outdb_url", i) + })?; + let source_band_num: usize = band_metadata + .outdb_band_id() + .ok_or_else(|| { + exec_datafusion_err!("Band {} is out-db but missing band_id", i) + })? + .try_into() + .map_err(|_| { + exec_datafusion_err!("Band {} out-db band_id is too large", i) + })?; + + let source_dataset = self.get_or_create_outdb_source(gdal, url, None)?; + + // If GDALGetGeoTransform(hdsSrc, ogt) fails, we falls back to (0, 1, 0, 0, 0, -1), Review Comment: Typo in comment: "we falls back" → "we fall back". ```suggestion // If GDALGetGeoTransform(hdsSrc, ogt) fails, we fall back to (0, 1, 0, 0, 0, -1), ``` ########## rust/sedona-raster-gdal/src/gdal_dataset_provider.rs: ########## @@ -0,0 +1,786 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::convert::TryInto; +use std::{cell::RefCell, marker::PhantomData, num::NonZeroUsize, rc::Rc}; + +use datafusion_common::config::ConfigOptions; +use datafusion_common::{ + arrow_datafusion_err, exec_datafusion_err, exec_err, DataFusionError, Result, +}; + +use sedona_gdal::dataset::Dataset; +use sedona_gdal::gdal::Gdal; +use sedona_gdal::geo_transform::{GeoTransform, GeoTransformEx}; +use sedona_gdal::raster::types::GdalDataType; + +use sedona_common::SedonaOptions; +use sedona_raster::traits::RasterRef; +use sedona_schema::raster::{BandDataType, StorageType}; + +use crate::gdal_common::{ + band_data_type_to_gdal, bytes_to_f64, convert_gdal_err, normalize_outdb_source_path, + open_gdal_dataset, raster_ref_to_gdal_empty, raster_ref_to_gdal_mem, +}; + +/// A GDAL dataset constructed from a `RasterRef`. +/// +/// This struct is designed to keep any backing GDAL datasets alive for as long as +/// the returned `dataset` might reference them. +/// +/// Field semantics by raster storage layout: +/// +/// 1) **In-db bands only** +/// - `dataset`: a GDAL **MEM** dataset containing all bands. +/// - `gdal_mem_source`: `None` (the MEM dataset is already stored in `dataset`). +/// - `_gdal_outdb_sources`: empty. +/// +/// 2) **Out-db bands only** +/// - `dataset`: a GDAL **VRT** dataset sized like the target raster, with each VRT band +/// sourcing from an external dataset band. +/// - `gdal_mem_source`: `None`. +/// - `_gdal_outdb_sources`: contains the opened external GDAL datasets (kept alive via `Rc`). +/// (There may be duplicates if multiple bands reference the same URL; that is fine.) +/// +/// 3) **Mixed in-db + out-db bands** +/// - `dataset`: a GDAL **VRT** dataset with band order matching the target raster. +/// In-db bands source from a MEM dataset; out-db bands source from external datasets. +/// - `gdal_mem_source`: `Some(MEM dataset)` containing only the in-db bands, in the same order +/// as they appear in the target raster. +/// - `_gdal_outdb_sources`: contains the opened external GDAL datasets (kept alive via `Rc`). +pub(crate) struct RasterDataset<'a> { + /// The dataset to use for further GDAL operations. + dataset: Rc<Dataset>, + /// A MEM dataset holding in-db band data when `dataset` is a VRT that references it. + _gdal_mem_source: Option<Rc<Dataset>>, + /// External datasets referenced by the VRT; kept alive for the lifetime of this struct. + _gdal_outdb_sources: Vec<Rc<Dataset>>, + /// Binds this dataset's lifetime to the borrowed source raster. + _source_raster: PhantomData<&'a dyn RasterRef>, +} + +impl<'a> RasterDataset<'a> { + /// Return a reference to the underlying GDAL dataset. + pub(crate) fn as_dataset(&self) -> &Dataset { + &self.dataset + } +} + +thread_local! { + /// Thread-local lazily-initialized `GDALDatasetCache`. + static TL_GDAL_DATASET_CACHE: RefCell<Option<Rc<GDALDatasetCache>>> = const { RefCell::new(None) }; +} + +const DEFAULT_GDAL_SOURCE_CACHE_SIZE: usize = 32; +const DEFAULT_GDAL_VRT_CACHE_SIZE: usize = 32; + +pub(crate) fn configure_thread_local_options( + gdal: &Gdal, + config_options: Option<&ConfigOptions>, +) -> Result<()> { + // Set frequently requested GDAL config options as thread-local options to eliminate the + // need for acquiring configs from global config or environment variable, which is very + // likely to result in heavy contention in multi-threaded environments. + let cpl_debug_enabled = config_options + .and_then(|c| { + c.extensions + .get::<SedonaOptions>() + .map(|opts| opts.gdal.cpl_debug) + }) + .unwrap_or(false); + let cpl_debug_value = if cpl_debug_enabled { "ON" } else { "OFF" }; + + let thread_local_options = [ + ("CPL_DEBUG", cpl_debug_value), + ("OSR_DEFAULT_AXIS_MAPPING_STRATEGY", "AUTHORITY_COMPLIANT"), + ("GDAL_VALIDATE_CREATION_OPTIONS", "YES"), + ("CHECK_WITH_INVERT_PROJ", "NO"), + ("GDAL_FORCE_CACHING", "NO"), + ("GDAL_ENABLE_READ_WRITE_MUTEX", "YES"), + ]; + + for (key, value) in thread_local_options { + gdal.set_thread_local_config_option(key, value) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + } + Ok(()) +} + +/// Get or create the thread-local `GDALDatasetCache`. +pub(crate) fn thread_local_cache() -> Result<Rc<GDALDatasetCache>> { + TL_GDAL_DATASET_CACHE.with(|cell| { + let mut opt = cell.borrow_mut(); + if let Some(rc) = opt.as_ref() { + Ok(Rc::clone(rc)) + } else { + let cache = Rc::new(GDALDatasetCache::try_new( + DEFAULT_GDAL_SOURCE_CACHE_SIZE, + DEFAULT_GDAL_VRT_CACHE_SIZE, + )?); + *opt = Some(Rc::clone(&cache)); + Ok(cache) + } + }) +} + +/// Build a `GDALDatasetProvider` from an explicit GDAL handle and the thread-local cache. +pub(crate) fn thread_local_provider<'a>(gdal: &'a Gdal) -> Result<GDALDatasetProvider<'a>> { + Ok(GDALDatasetProvider::new(gdal, thread_local_cache()?)) +} + +#[derive(Hash, Eq, PartialEq)] +struct OutDbSourceKey { + path: String, + open_options: Option<Vec<String>>, +} + +impl OutDbSourceKey { + pub fn new(path: &str, options: Option<&[&str]>) -> Self { + let normalized_path = normalize_outdb_source_path(path); + let open_options = options + .filter(|opts| !opts.is_empty()) + .map(|opts| opts.iter().map(|s| (*s).to_string()).collect()); + + Self { + path: normalized_path, + open_options, + } + } +} + +pub(crate) struct GDALDatasetCache { + cached_sources: RefCell<lru::LruCache<OutDbSourceKey, Rc<Dataset>>>, + cached_vrts: RefCell<lru::LruCache<VrtKey, Rc<CachedVrt>>>, +} + +impl GDALDatasetCache { + pub fn try_new(cache_capacity: usize, vrt_cache_capacity: usize) -> Result<Self> { + let Some(cap) = NonZeroUsize::new(cache_capacity) else { + return Err(DataFusionError::Configuration( + "Raster source cache size should be greater than 0".to_string(), + )); + }; + let Some(vrt_cap) = NonZeroUsize::new(vrt_cache_capacity) else { + return Err(DataFusionError::Configuration( + "Raster VRT cache size should be greater than 0".to_string(), + )); + }; + let cache = lru::LruCache::new(cap); + let vrt_cache = lru::LruCache::new(vrt_cap); + Ok(Self { + cached_sources: RefCell::new(cache), + cached_vrts: RefCell::new(vrt_cache), + }) + } + + fn get_or_create_outdb_source( + &self, + gdal: &Gdal, + path: &str, + options: Option<&[&str]>, + ) -> Result<Rc<Dataset>> { + let cache_key = OutDbSourceKey::new(path, options); + let mut cache = self.cached_sources.borrow_mut(); + if let Some(cached_source) = cache.get(&cache_key) { + Ok(Rc::clone(cached_source)) + } else { + let source_dataset = open_gdal_dataset(gdal, path, options)?; + let rc_dataset = Rc::new(source_dataset); + cache.put(cache_key, Rc::clone(&rc_dataset)); + Ok(rc_dataset) + } + } + + fn build_vrt_from_sources<R: RasterRef + ?Sized>( + &self, + gdal: &Gdal, + raster: &R, + gdal_mem_source: Option<&Rc<Dataset>>, + ) -> Result<(Rc<Dataset>, Vec<Rc<Dataset>>)> { + let metadata = raster.metadata(); + let bands = raster.bands(); + let num_bands = bands.len(); + + let width = metadata.width() as i32; + let height = metadata.height() as i32; + let mut vrt = gdal + .create_vrt(metadata.width() as usize, metadata.height() as usize) + .map_err(convert_gdal_err)?; + + let geotransform = [ + metadata.upper_left_x(), + metadata.scale_x(), + metadata.skew_x(), + metadata.upper_left_y(), + metadata.skew_y(), + metadata.scale_y(), + ]; + vrt.set_geo_transform(&geotransform) + .map_err(convert_gdal_err)?; + if let Some(crs) = raster.crs() { + vrt.set_projection(crs).map_err(convert_gdal_err)?; + } + + let mut outdb_sources: Vec<Rc<Dataset>> = Vec::new(); + let mut mem_band_index: usize = 1; + + for i in 1..=num_bands { + let band = bands.band(i).map_err(|e| arrow_datafusion_err!(e))?; + let band_metadata = band.metadata(); + let band_type = band_metadata.data_type()?; + let gdal_type = band_data_type_to_gdal(&band_type); + if matches!(gdal_type, GdalDataType::Unknown) { + return Err(DataFusionError::NotImplemented(format!( + "Band data type {:?} is not supported by this GDAL build", + band_type + ))); + } + + vrt.add_band(gdal_type, None).map_err(convert_gdal_err)?; + let vrt_band = vrt.rasterband(i).map_err(convert_gdal_err)?; + + if let Some(nodata_bytes) = band_metadata.nodata_value() { + match band_type { + BandDataType::UInt64 => { + let nodata_bytes: [u8; 8] = nodata_bytes.try_into().map_err(|_| { + exec_datafusion_err!("Invalid nodata byte length for UInt64") + })?; + let nodata = u64::from_le_bytes(nodata_bytes); + vrt_band + .set_no_data_value_u64(Some(nodata)) + .map_err(convert_gdal_err)?; + } + BandDataType::Int64 => { + let nodata_bytes: [u8; 8] = nodata_bytes.try_into().map_err(|_| { + exec_datafusion_err!("Invalid nodata byte length for Int64") + })?; + let nodata = i64::from_le_bytes(nodata_bytes); + vrt_band + .set_no_data_value_i64(Some(nodata)) + .map_err(convert_gdal_err)?; + } + _ => { + let nodata = bytes_to_f64(nodata_bytes, &band_type)?; + vrt_band + .set_no_data_value(nodata) + .map_err(convert_gdal_err)?; + } + } + } + + match band_metadata.storage_type()? { + StorageType::OutDbRef => { + let url = band_metadata.outdb_url().ok_or_else(|| { + exec_datafusion_err!("Band {} is out-db but missing outdb_url", i) + })?; + let source_band_num: usize = band_metadata + .outdb_band_id() + .ok_or_else(|| { + exec_datafusion_err!("Band {} is out-db but missing band_id", i) + })? + .try_into() + .map_err(|_| { + exec_datafusion_err!("Band {} out-db band_id is too large", i) + })?; + + let source_dataset = self.get_or_create_outdb_source(gdal, url, None)?; + + // If GDALGetGeoTransform(hdsSrc, ogt) fails, we falls back to (0, 1, 0, 0, 0, -1), + // which is the identity transform. + let src_geo_transform = source_dataset + .geo_transform() + .unwrap_or([0.0, 1.0, 0.0, 0.0, 0.0, -1.0]); + let (src_w, src_h) = source_dataset.raster_size(); + + // Compute source and destination windows for the VRT simple source. The VRT usually only + // clip a small portion of the source dataset. + let Some((src_window, dst_window)) = compute_vrt_simple_source_windows( + &geotransform, + (width, height), + &src_geo_transform, + (src_w as i32, src_h as i32), + )? + else { + // No spatial overlap between the target raster and the source dataset. + // Leave the VRT band as nodata. + continue; + }; + + let source_band = source_dataset + .rasterband(source_band_num) + .map_err(convert_gdal_err)?; + + vrt_band + // Avoid passing per-source NODATA to VRT simple sources; some GDAL builds + // warn that NODATA isn't supported for neighbour-sampled simple sources + // on virtual datasources. We set band-level NODATA via set_no_data_value. + .add_simple_source(&source_band, src_window, dst_window, None, None) + .map_err(convert_gdal_err)?; + + outdb_sources.push(source_dataset); + } + StorageType::InDb => { + let mem_dataset = gdal_mem_source + .as_ref() + .expect("in-db dataset should exist"); + let source_band = mem_dataset + .rasterband(mem_band_index) + .map_err(convert_gdal_err)?; + mem_band_index += 1; + + vrt_band + .add_simple_source( + &source_band, + (0, 0, width, height), + (0, 0, width, height), + None, + None, + ) + .map_err(convert_gdal_err)?; + } + } + } + + Ok((Rc::new(vrt.as_dataset()), outdb_sources)) + } +} + +#[derive(Clone)] +pub(crate) struct GDALDatasetProvider<'a> { + gdal: &'a Gdal, + cache: Rc<GDALDatasetCache>, +} + +impl<'a> GDALDatasetProvider<'a> { + pub fn new(gdal: &'a Gdal, cache: Rc<GDALDatasetCache>) -> Self { + Self { gdal, cache } + } + + pub fn raster_ref_to_gdal<'b, R: RasterRef + ?Sized>( + &self, + raster: &'b R, + ) -> Result<RasterDataset<'b>> { + let bands = raster.bands(); + let num_bands = bands.len(); + + if num_bands == 0 { + let dataset = raster_ref_to_gdal_empty(self.gdal, raster)?; + return Ok(RasterDataset { + dataset: Rc::new(dataset), + _gdal_mem_source: None, + _gdal_outdb_sources: Vec::new(), + _source_raster: PhantomData, + }); + } + + let mut indb_band_indices = Vec::with_capacity(num_bands); + let mut has_outdb = false; + for i in 1..=num_bands { + let band = bands.band(i).map_err(|e| arrow_datafusion_err!(e))?; + match band.metadata().storage_type()? { + StorageType::InDb => indb_band_indices.push(i), + StorageType::OutDbRef => has_outdb = true, + } + } + + let mut gdal_mem_source = if !indb_band_indices.is_empty() { + Some(Rc::new(unsafe { + raster_ref_to_gdal_mem(self.gdal, raster, &indb_band_indices)? + })) + } else { + None + }; + + if !has_outdb { + let dataset = gdal_mem_source.take().expect("in-db dataset should exist"); + return Ok(RasterDataset { + dataset, + _gdal_mem_source: None, + _gdal_outdb_sources: Vec::new(), + _source_raster: PhantomData, + }); + } + + if indb_band_indices.is_empty() { + let vrt_key = VrtKey::from_raster(raster)?; + if let Some(cached) = self.cache.cached_vrts.borrow_mut().get(&vrt_key) { + return Ok(RasterDataset { + dataset: Rc::clone(&cached.dataset), + _gdal_mem_source: None, + _gdal_outdb_sources: cached.outdb_sources.clone(), + _source_raster: PhantomData, + }); + } + + let (dataset, outdb_sources) = + self.cache.build_vrt_from_sources(self.gdal, raster, None)?; + let cached = Rc::new(CachedVrt { + dataset: Rc::clone(&dataset), + outdb_sources: outdb_sources.clone(), + }); + self.cache + .cached_vrts + .borrow_mut() + .put(vrt_key, Rc::clone(&cached)); + + return Ok(RasterDataset { + dataset, + _gdal_mem_source: None, + _gdal_outdb_sources: outdb_sources, + _source_raster: PhantomData, + }); + } + + let (dataset, outdb_sources) = + self.cache + .build_vrt_from_sources(self.gdal, raster, gdal_mem_source.as_ref())?; + + Ok(RasterDataset { + dataset, + _gdal_mem_source: gdal_mem_source, + _gdal_outdb_sources: outdb_sources, + _source_raster: PhantomData, + }) + } +} + +#[derive(Hash, Eq, PartialEq)] +struct VrtBandKey { + storage_type: StorageType, + data_type: BandDataType, + nodata_bits: Option<u64>, + outdb_url: Option<String>, + outdb_band_id: Option<u32>, +} + +#[derive(Hash, Eq, PartialEq)] +struct VrtKey { + width: u64, + height: u64, + geotransform_bits: [u64; 6], + crs: Option<String>, + bands: Vec<VrtBandKey>, +} + +impl VrtKey { + fn from_raster<R: RasterRef + ?Sized>(raster: &R) -> Result<Self> { + let metadata = raster.metadata(); + let bands = raster.bands(); + let num_bands = bands.len(); + + let geotransform = [ + metadata.upper_left_x(), + metadata.scale_x(), + metadata.skew_x(), + metadata.upper_left_y(), + metadata.skew_y(), + metadata.scale_y(), + ]; + let geotransform_bits = geotransform.map(f64::to_bits); + + let mut band_keys = Vec::with_capacity(num_bands); + for i in 1..=num_bands { + let band = bands.band(i).map_err(|e| arrow_datafusion_err!(e))?; + let band_metadata = band.metadata(); + let band_type = band_metadata.data_type()?; + let nodata_bits = match (band_metadata.nodata_value(), band_type) { + (Some(bytes), BandDataType::UInt64) => { + let bytes: [u8; 8] = bytes.try_into().map_err(|_| { + exec_datafusion_err!("Invalid nodata byte length for UInt64") + })?; + Some(f64::to_bits(u64::from_le_bytes(bytes) as f64)) + } + (Some(bytes), BandDataType::Int64) => { + let bytes: [u8; 8] = bytes.try_into().map_err(|_| { + exec_datafusion_err!("Invalid nodata byte length for Int64") + })?; + Some(f64::to_bits(i64::from_le_bytes(bytes) as f64)) Review Comment: `VrtKey` encodes UInt64/Int64 nodata by casting to `f64` and storing `f64::to_bits(...)`. This loses precision for values > 2^53 and can cause different nodata values to map to the same cache key, leading to incorrect VRT cache reuse. Consider storing the raw 64-bit integer bits (or the original nodata byte vector) in the key for 64-bit integer band types. ```suggestion Some(u64::from_le_bytes(bytes)) } (Some(bytes), BandDataType::Int64) => { let bytes: [u8; 8] = bytes.try_into().map_err(|_| { exec_datafusion_err!("Invalid nodata byte length for Int64") })?; Some(u64::from_le_bytes(bytes)) ``` ########## rust/sedona-raster-gdal/src/gdal_dataset_provider.rs: ########## @@ -0,0 +1,786 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::convert::TryInto; +use std::{cell::RefCell, marker::PhantomData, num::NonZeroUsize, rc::Rc}; + +use datafusion_common::config::ConfigOptions; +use datafusion_common::{ + arrow_datafusion_err, exec_datafusion_err, exec_err, DataFusionError, Result, +}; + +use sedona_gdal::dataset::Dataset; +use sedona_gdal::gdal::Gdal; +use sedona_gdal::geo_transform::{GeoTransform, GeoTransformEx}; +use sedona_gdal::raster::types::GdalDataType; + +use sedona_common::SedonaOptions; +use sedona_raster::traits::RasterRef; +use sedona_schema::raster::{BandDataType, StorageType}; + +use crate::gdal_common::{ + band_data_type_to_gdal, bytes_to_f64, convert_gdal_err, normalize_outdb_source_path, + open_gdal_dataset, raster_ref_to_gdal_empty, raster_ref_to_gdal_mem, +}; + +/// A GDAL dataset constructed from a `RasterRef`. +/// +/// This struct is designed to keep any backing GDAL datasets alive for as long as +/// the returned `dataset` might reference them. +/// +/// Field semantics by raster storage layout: +/// +/// 1) **In-db bands only** +/// - `dataset`: a GDAL **MEM** dataset containing all bands. +/// - `gdal_mem_source`: `None` (the MEM dataset is already stored in `dataset`). +/// - `_gdal_outdb_sources`: empty. +/// +/// 2) **Out-db bands only** +/// - `dataset`: a GDAL **VRT** dataset sized like the target raster, with each VRT band +/// sourcing from an external dataset band. +/// - `gdal_mem_source`: `None`. +/// - `_gdal_outdb_sources`: contains the opened external GDAL datasets (kept alive via `Rc`). +/// (There may be duplicates if multiple bands reference the same URL; that is fine.) +/// +/// 3) **Mixed in-db + out-db bands** +/// - `dataset`: a GDAL **VRT** dataset with band order matching the target raster. +/// In-db bands source from a MEM dataset; out-db bands source from external datasets. +/// - `gdal_mem_source`: `Some(MEM dataset)` containing only the in-db bands, in the same order +/// as they appear in the target raster. +/// - `_gdal_outdb_sources`: contains the opened external GDAL datasets (kept alive via `Rc`). +pub(crate) struct RasterDataset<'a> { + /// The dataset to use for further GDAL operations. + dataset: Rc<Dataset>, + /// A MEM dataset holding in-db band data when `dataset` is a VRT that references it. + _gdal_mem_source: Option<Rc<Dataset>>, + /// External datasets referenced by the VRT; kept alive for the lifetime of this struct. + _gdal_outdb_sources: Vec<Rc<Dataset>>, + /// Binds this dataset's lifetime to the borrowed source raster. + _source_raster: PhantomData<&'a dyn RasterRef>, +} + +impl<'a> RasterDataset<'a> { + /// Return a reference to the underlying GDAL dataset. + pub(crate) fn as_dataset(&self) -> &Dataset { + &self.dataset + } +} + +thread_local! { + /// Thread-local lazily-initialized `GDALDatasetCache`. + static TL_GDAL_DATASET_CACHE: RefCell<Option<Rc<GDALDatasetCache>>> = const { RefCell::new(None) }; +} + +const DEFAULT_GDAL_SOURCE_CACHE_SIZE: usize = 32; +const DEFAULT_GDAL_VRT_CACHE_SIZE: usize = 32; + +pub(crate) fn configure_thread_local_options( + gdal: &Gdal, + config_options: Option<&ConfigOptions>, +) -> Result<()> { + // Set frequently requested GDAL config options as thread-local options to eliminate the + // need for acquiring configs from global config or environment variable, which is very + // likely to result in heavy contention in multi-threaded environments. + let cpl_debug_enabled = config_options + .and_then(|c| { + c.extensions + .get::<SedonaOptions>() + .map(|opts| opts.gdal.cpl_debug) + }) + .unwrap_or(false); + let cpl_debug_value = if cpl_debug_enabled { "ON" } else { "OFF" }; + + let thread_local_options = [ + ("CPL_DEBUG", cpl_debug_value), + ("OSR_DEFAULT_AXIS_MAPPING_STRATEGY", "AUTHORITY_COMPLIANT"), + ("GDAL_VALIDATE_CREATION_OPTIONS", "YES"), + ("CHECK_WITH_INVERT_PROJ", "NO"), + ("GDAL_FORCE_CACHING", "NO"), + ("GDAL_ENABLE_READ_WRITE_MUTEX", "YES"), + ]; + + for (key, value) in thread_local_options { + gdal.set_thread_local_config_option(key, value) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + } + Ok(()) +} + +/// Get or create the thread-local `GDALDatasetCache`. +pub(crate) fn thread_local_cache() -> Result<Rc<GDALDatasetCache>> { + TL_GDAL_DATASET_CACHE.with(|cell| { + let mut opt = cell.borrow_mut(); + if let Some(rc) = opt.as_ref() { + Ok(Rc::clone(rc)) + } else { + let cache = Rc::new(GDALDatasetCache::try_new( + DEFAULT_GDAL_SOURCE_CACHE_SIZE, + DEFAULT_GDAL_VRT_CACHE_SIZE, + )?); + *opt = Some(Rc::clone(&cache)); + Ok(cache) + } + }) +} + +/// Build a `GDALDatasetProvider` from an explicit GDAL handle and the thread-local cache. +pub(crate) fn thread_local_provider<'a>(gdal: &'a Gdal) -> Result<GDALDatasetProvider<'a>> { + Ok(GDALDatasetProvider::new(gdal, thread_local_cache()?)) +} + +#[derive(Hash, Eq, PartialEq)] +struct OutDbSourceKey { + path: String, + open_options: Option<Vec<String>>, +} + +impl OutDbSourceKey { + pub fn new(path: &str, options: Option<&[&str]>) -> Self { + let normalized_path = normalize_outdb_source_path(path); + let open_options = options + .filter(|opts| !opts.is_empty()) + .map(|opts| opts.iter().map(|s| (*s).to_string()).collect()); + + Self { + path: normalized_path, + open_options, + } + } +} + +pub(crate) struct GDALDatasetCache { + cached_sources: RefCell<lru::LruCache<OutDbSourceKey, Rc<Dataset>>>, + cached_vrts: RefCell<lru::LruCache<VrtKey, Rc<CachedVrt>>>, +} + +impl GDALDatasetCache { + pub fn try_new(cache_capacity: usize, vrt_cache_capacity: usize) -> Result<Self> { + let Some(cap) = NonZeroUsize::new(cache_capacity) else { + return Err(DataFusionError::Configuration( + "Raster source cache size should be greater than 0".to_string(), + )); + }; + let Some(vrt_cap) = NonZeroUsize::new(vrt_cache_capacity) else { + return Err(DataFusionError::Configuration( + "Raster VRT cache size should be greater than 0".to_string(), + )); + }; + let cache = lru::LruCache::new(cap); + let vrt_cache = lru::LruCache::new(vrt_cap); + Ok(Self { + cached_sources: RefCell::new(cache), + cached_vrts: RefCell::new(vrt_cache), + }) + } + + fn get_or_create_outdb_source( + &self, + gdal: &Gdal, + path: &str, + options: Option<&[&str]>, + ) -> Result<Rc<Dataset>> { + let cache_key = OutDbSourceKey::new(path, options); + let mut cache = self.cached_sources.borrow_mut(); + if let Some(cached_source) = cache.get(&cache_key) { + Ok(Rc::clone(cached_source)) + } else { + let source_dataset = open_gdal_dataset(gdal, path, options)?; + let rc_dataset = Rc::new(source_dataset); + cache.put(cache_key, Rc::clone(&rc_dataset)); + Ok(rc_dataset) + } + } + + fn build_vrt_from_sources<R: RasterRef + ?Sized>( + &self, + gdal: &Gdal, + raster: &R, + gdal_mem_source: Option<&Rc<Dataset>>, + ) -> Result<(Rc<Dataset>, Vec<Rc<Dataset>>)> { + let metadata = raster.metadata(); + let bands = raster.bands(); + let num_bands = bands.len(); + + let width = metadata.width() as i32; + let height = metadata.height() as i32; + let mut vrt = gdal + .create_vrt(metadata.width() as usize, metadata.height() as usize) Review Comment: `width`/`height` are cast from `u64` to `i32` without bounds checking. For rasters larger than `i32::MAX` this will wrap/overflow and can produce incorrect VRT windows or panics later. Add an explicit range check and return a DataFusion error if dimensions exceed supported GDAL/i32 limits (or refactor to use wider integers throughout the window computations). ```suggestion let metadata_width = metadata.width(); let metadata_height = metadata.height(); let width: i32 = metadata_width.try_into().map_err(|_| { exec_err!( "Raster width {} exceeds supported GDAL/i32 limit {}", metadata_width, i32::MAX ) })?; let height: i32 = metadata_height.try_into().map_err(|_| { exec_err!( "Raster height {} exceeds supported GDAL/i32 limit {}", metadata_height, i32::MAX ) })?; let vrt_width: usize = metadata_width.try_into().map_err(|_| { exec_err!( "Raster width {} exceeds supported GDAL/usize limit", metadata_width ) })?; let vrt_height: usize = metadata_height.try_into().map_err(|_| { exec_err!( "Raster height {} exceeds supported GDAL/usize limit", metadata_height ) })?; let mut vrt = gdal .create_vrt(vrt_width, vrt_height) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
