This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch server-config-refactor in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 32a11c48efbff4a4b14dc2cfca16b6069f5f4fd8 Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Feb 23 12:42:09 2026 +0100 refactor(configs): move ServerConfig types from server to configs crate Config types were coupled to the server binary, forcing dependents to pull in the entire server crate. Move them to core/configs/ with a thin re-export facade so existing server imports stay unchanged. Add TODO for extracting iggy_types leaf crate to break the configs <-> iggy_common cycle and eliminate the MemoryPoolConfigOther duplicate. --- Cargo.lock | 17 +- DEPENDENCIES.md | 2 +- core/common/src/alloc/memory_pool.rs | 3 + core/configs/Cargo.toml | 8 + core/configs/src/lib.rs | 6 +- .../src/server_config}/cache_indexes.rs | 0 .../src/server_config}/cluster.rs | 0 .../src/server_config}/defaults.rs | 26 +- .../src/server_config}/displays.rs | 8 +- .../configs => configs/src/server_config}/http.rs | 0 .../configs => configs/src/server_config}/mod.rs | 0 .../configs => configs/src/server_config}/quic.rs | 0 .../src/server_config}/server.rs | 19 +- core/configs/src/server_config/sharding.rs | 261 +++++++++ .../src/server_config}/system.rs | 2 +- .../configs => configs/src/server_config}/tcp.rs | 0 .../src/server_config}/validators.rs | 41 +- .../src/server_config}/websocket.rs | 0 core/server/Cargo.toml | 5 - core/server/src/bootstrap.rs | 2 +- core/server/src/{configs/mod.rs => configs.rs} | 18 +- core/server/src/configs/sharding.rs | 637 --------------------- core/server/src/lib.rs | 1 + core/server/src/main.rs | 2 +- core/server/src/server_error.rs | 30 +- core/server/src/shard_allocator.rs | 379 ++++++++++++ core/server/src/streaming/segments/mod.rs | 2 +- 27 files changed, 717 insertions(+), 752 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 47adb0aa1..39a1f2e61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1936,10 +1936,18 @@ name = "configs" version = "0.1.0" dependencies = [ "configs_derive", + "derive_more", + "err_trail", "figment", + "iggy_common", + "jsonwebtoken", "serde", "serde_json", + "serde_with", + "static-toml", + "strum", "tracing", + "tungstenite", ] [[package]] @@ -3161,7 +3169,6 @@ dependencies = [ "atomic", "pear", "serde", - "serde_json", "toml 0.8.23", "uncased", "version_check", @@ -5273,9 +5280,9 @@ dependencies = [ [[package]] name = "keccak" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654" +checksum = "cb26cec98cce3a3d96cbb7bced3c4b16e3d13f27ec56dbd62cbc8f39cfb9d653" dependencies = [ "cpufeatures 0.2.17", ] @@ -8338,13 +8345,11 @@ dependencies = [ "cyper", "cyper-axum", "dashmap", - "derive_more", "dotenvy", "enum_dispatch", "err_trail", "error_set", "figlet-rs", - "figment", "flume 0.12.0", "fs2", "futures", @@ -8375,10 +8380,8 @@ dependencies = [ "rustls-pemfile", "send_wrapper", "serde", - "serde_with", "slab", "socket2 0.6.2", - "static-toml", "strum", "sysinfo 0.38.1", "tempfile", diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 4c555b8c7..37680a011 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -439,7 +439,7 @@ journal: 0.1.0, "Apache-2.0", js-sys: 0.3.85, "Apache-2.0 OR MIT", jsonwebtoken: 10.3.0, "MIT", jwalk: 0.8.1, "MIT", -keccak: 0.1.5, "Apache-2.0 OR MIT", +keccak: 0.1.6, "Apache-2.0 OR MIT", keyring: 3.6.3, "Apache-2.0 OR MIT", kqueue: 1.1.1, "MIT", kqueue-sys: 1.0.4, "MIT", diff --git a/core/common/src/alloc/memory_pool.rs b/core/common/src/alloc/memory_pool.rs index 8ebc892c9..9216ef57f 100644 --- a/core/common/src/alloc/memory_pool.rs +++ b/core/common/src/alloc/memory_pool.rs @@ -73,6 +73,9 @@ pub fn memory_pool() -> &'static MemoryPool { .expect("Memory pool not initialized - MemoryPool::init_pool should be called first") } +// TODO: Extract shared domain types (IggyByteSize, IggyDuration, etc.) into an `iggy_types` +// leaf crate so `iggy_common` can depend on `configs` directly. That lets us delete this +// duplicate and use `configs::server::MemoryPoolConfig` here instead. /// Configuration for the memory pool. #[derive(Debug)] pub struct MemoryPoolConfigOther { diff --git a/core/configs/Cargo.toml b/core/configs/Cargo.toml index d03f44c04..ed7d6eba3 100644 --- a/core/configs/Cargo.toml +++ b/core/configs/Cargo.toml @@ -24,7 +24,15 @@ publish = false [dependencies] configs_derive = { workspace = true } +derive_more = { workspace = true } +err_trail = { workspace = true } figment = { workspace = true } +iggy_common = { workspace = true } +jsonwebtoken = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +serde_with = { workspace = true } +static-toml = { workspace = true } +strum = { workspace = true } tracing = { workspace = true } +tungstenite = { workspace = true } diff --git a/core/configs/src/lib.rs b/core/configs/src/lib.rs index d1410ab8a..419c77b8a 100644 --- a/core/configs/src/lib.rs +++ b/core/configs/src/lib.rs @@ -20,9 +20,13 @@ extern crate self as configs; mod configs_impl; - +mod server_config; pub use configs_derive::ConfigEnv; pub use configs_impl::{ ConfigEnvMappings, ConfigProvider, ConfigurationError, ConfigurationType, EnvVarMapping, FileConfigProvider, TypedEnvProvider, parse_env_value_to_json, }; +pub use server_config::{ + COMPONENT, cache_indexes, cluster, defaults, displays, http, quic, server, sharding, system, + tcp, validators, websocket, +}; diff --git a/core/server/src/configs/cache_indexes.rs b/core/configs/src/server_config/cache_indexes.rs similarity index 100% rename from core/server/src/configs/cache_indexes.rs rename to core/configs/src/server_config/cache_indexes.rs diff --git a/core/server/src/configs/cluster.rs b/core/configs/src/server_config/cluster.rs similarity index 100% rename from core/server/src/configs/cluster.rs rename to core/configs/src/server_config/cluster.rs diff --git a/core/server/src/configs/defaults.rs b/core/configs/src/server_config/defaults.rs similarity index 96% rename from core/server/src/configs/defaults.rs rename to core/configs/src/server_config/defaults.rs index d2f1e61af..27eae580d 100644 --- a/core/server/src/configs/defaults.rs +++ b/core/configs/src/server_config/defaults.rs @@ -16,35 +16,33 @@ * under the License. */ -use super::sharding::ShardingConfig; -use super::tcp::TcpSocketConfig; -use crate::configs::cluster::CurrentNodeConfig; -use crate::configs::cluster::{ClusterConfig, NodeConfig, OtherNodeConfig, TransportPorts}; -use crate::configs::http::{ - HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, HttpTlsConfig, -}; -use crate::configs::quic::{QuicCertificateConfig, QuicConfig, QuicSocketConfig}; -use crate::configs::server::{ +use super::cluster::CurrentNodeConfig; +use super::cluster::{ClusterConfig, NodeConfig, OtherNodeConfig, TransportPorts}; +use super::http::{HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, HttpTlsConfig}; +use super::quic::{QuicCertificateConfig, QuicConfig, QuicSocketConfig}; +use super::server::{ ConsumerGroupConfig, DataMaintenanceConfig, HeartbeatConfig, MemoryPoolConfig, MessageSaverConfig, MessagesMaintenanceConfig, PersonalAccessTokenCleanerConfig, PersonalAccessTokenConfig, ServerConfig, TelemetryConfig, TelemetryLogsConfig, TelemetryTracesConfig, }; -use crate::configs::system::{ +use super::sharding::ShardingConfig; +use super::system::{ BackupConfig, CompatibilityConfig, CompressionConfig, EncryptionConfig, LoggingConfig, MessageDeduplicationConfig, PartitionConfig, RecoveryConfig, RuntimeConfig, SegmentConfig, StateConfig, StreamConfig, SystemConfig, TopicConfig, }; -use crate::configs::tcp::{TcpConfig, TcpTlsConfig}; -use crate::configs::websocket::{WebSocketConfig, WebSocketTlsConfig}; +use super::tcp::TcpSocketConfig; +use super::tcp::{TcpConfig, TcpTlsConfig}; +use super::websocket::{WebSocketConfig, WebSocketTlsConfig}; use iggy_common::IggyByteSize; use iggy_common::IggyDuration; use std::sync::Arc; use std::time::Duration; static_toml::static_toml! { - // static_toml crate always starts from CARGO_MANIFEST_DIR (core/server) - pub static SERVER_CONFIG = include_toml!("config.toml"); + // static_toml resolves relative to CARGO_MANIFEST_DIR (core/configs/). + pub static SERVER_CONFIG = include_toml!("../server/config.toml"); } impl Default for ServerConfig { diff --git a/core/server/src/configs/displays.rs b/core/configs/src/server_config/displays.rs similarity index 98% rename from core/server/src/configs/displays.rs rename to core/configs/src/server_config/displays.rs index df7741564..959d35476 100644 --- a/core/server/src/configs/displays.rs +++ b/core/configs/src/server_config/displays.rs @@ -17,13 +17,13 @@ * under the License. */ -use crate::configs::quic::{QuicCertificateConfig, QuicConfig}; -use crate::configs::server::{ +use super::quic::{QuicCertificateConfig, QuicConfig}; +use super::server::{ ConsumerGroupConfig, DataMaintenanceConfig, HeartbeatConfig, MessagesMaintenanceConfig, TelemetryConfig, TelemetryLogsConfig, TelemetryTracesConfig, }; -use crate::configs::system::MessageDeduplicationConfig; -use crate::configs::{ +use super::system::MessageDeduplicationConfig; +use super::{ http::{HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, HttpTlsConfig}, server::{MessageSaverConfig, ServerConfig}, system::{ diff --git a/core/server/src/configs/http.rs b/core/configs/src/server_config/http.rs similarity index 100% rename from core/server/src/configs/http.rs rename to core/configs/src/server_config/http.rs diff --git a/core/server/src/configs/mod.rs b/core/configs/src/server_config/mod.rs similarity index 100% copy from core/server/src/configs/mod.rs copy to core/configs/src/server_config/mod.rs diff --git a/core/server/src/configs/quic.rs b/core/configs/src/server_config/quic.rs similarity index 100% rename from core/server/src/configs/quic.rs rename to core/configs/src/server_config/quic.rs diff --git a/core/server/src/configs/server.rs b/core/configs/src/server_config/server.rs similarity index 93% rename from core/server/src/configs/server.rs rename to core/configs/src/server_config/server.rs index 850384f9e..8c005f599 100644 --- a/core/server/src/configs/server.rs +++ b/core/configs/src/server_config/server.rs @@ -16,14 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -use crate::configs::COMPONENT; -use crate::configs::cluster::ClusterConfig; -use crate::configs::http::HttpConfig; -use crate::configs::quic::QuicConfig; -use crate::configs::system::SystemConfig; -use crate::configs::tcp::TcpConfig; -use crate::configs::websocket::WebSocketConfig; -use crate::server_error::ConfigurationError; +use super::COMPONENT; +use super::cluster::ClusterConfig; +use super::http::HttpConfig; +use super::quic::QuicConfig; +use super::system::SystemConfig; +use super::tcp::TcpConfig; +use super::websocket::WebSocketConfig; +use crate::ConfigurationError; use configs::{ConfigEnv, ConfigEnvMappings, ConfigProvider, FileConfigProvider, TypedEnvProvider}; use derive_more::Display; use err_trail::ErrContext; @@ -66,7 +66,6 @@ pub struct MemoryPoolConfig { pub bucket_capacity: u32, } -// Hack around the fact that we define our config inside of the `server` crate, but `memory_pool` is in `common`. impl MemoryPoolConfig { pub fn into_other(&self) -> MemoryPoolConfigOther { MemoryPoolConfigOther { @@ -204,7 +203,7 @@ impl ServerConfig { /// Create a config provider using compile-time generated env var mappings. pub fn config_provider(config_path: &str) -> FileConfigProvider<ServerConfigEnvProvider> { - let default_config = Toml::string(include_str!("../../config.toml")); + let default_config = Toml::string(include_str!("../../../server/config.toml")); FileConfigProvider::new( config_path.to_string(), ServerConfigEnvProvider::default(), diff --git a/core/configs/src/server_config/sharding.rs b/core/configs/src/server_config/sharding.rs new file mode 100644 index 000000000..084c984e1 --- /dev/null +++ b/core/configs/src/server_config/sharding.rs @@ -0,0 +1,261 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use std::str::FromStr; + +use configs::ConfigEnv; + +#[derive(Debug, Deserialize, Serialize, Default, ConfigEnv)] +pub struct ShardingConfig { + #[serde(default)] + #[config_env(leaf)] + pub cpu_allocation: CpuAllocation, +} + +#[derive(Debug, Clone, PartialEq, Default)] +pub enum CpuAllocation { + #[default] + All, + Count(usize), + Range(usize, usize), + NumaAware(NumaConfig), +} + +/// NUMA specific configuration +#[derive(Debug, Clone, PartialEq, Default)] +pub struct NumaConfig { + /// Which NUMA nodes to use (empty = auto-detect all) + pub nodes: Vec<usize>, + /// Cores per node to use (0 = use all available) + pub cores_per_node: usize, + /// skip hyperthread sibling + pub avoid_hyperthread: bool, +} + +impl CpuAllocation { + fn parse_numa(s: &str) -> Result<CpuAllocation, String> { + let params = s + .strip_prefix("numa:") + .ok_or_else(|| "Numa config must start with 'numa:'".to_string())?; + + if params == "auto" { + return Ok(CpuAllocation::NumaAware(NumaConfig { + nodes: vec![], + cores_per_node: 0, + avoid_hyperthread: true, + })); + } + + let mut nodes = Vec::new(); + let mut cores_per_node = 0; + let mut avoid_hyperthread = true; + + for param in params.split(';') { + let kv: Vec<&str> = param.split('=').collect(); + if kv.len() != 2 { + return Err(format!( + "Invalid NUMA parameter: '{param}', only available: 'auto'" + )); + } + + match kv[0] { + "nodes" => { + nodes = kv[1] + .split(',') + .map(|n| { + n.parse::<usize>() + .map_err(|_| format!("Invalid node number: {n}")) + }) + .collect::<Result<Vec<_>, _>>()?; + } + "cores" => { + cores_per_node = kv[1] + .parse::<usize>() + .map_err(|_| format!("Invalid cores value: {}", kv[1]))?; + } + "no_ht" => { + avoid_hyperthread = kv[1] + .parse::<bool>() + .map_err(|_| format!("Invalid no ht value: {}", kv[1]))?; + } + _ => { + return Err(format!( + "Unknown NUMA parameter: {}, example: numa:nodes=0;cores=4;no_ht=true", + kv[0] + )); + } + } + } + + Ok(CpuAllocation::NumaAware(NumaConfig { + nodes, + cores_per_node, + avoid_hyperthread, + })) + } +} + +impl FromStr for CpuAllocation { + type Err = String; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s { + "all" => Ok(CpuAllocation::All), + s if s.starts_with("numa:") => Self::parse_numa(s), + s if s.contains("..") => { + let parts: Vec<&str> = s.split("..").collect(); + if parts.len() != 2 { + return Err(format!("Invalid range format: {s}. Expected 'start..end'")); + } + let start = parts[0] + .parse::<usize>() + .map_err(|_| format!("Invalid start value: {}", parts[0]))?; + let end = parts[1] + .parse::<usize>() + .map_err(|_| format!("Invalid end value: {}", parts[1]))?; + Ok(CpuAllocation::Range(start, end)) + } + s => { + let count = s + .parse::<usize>() + .map_err(|_| format!("Invalid shard count: {s}"))?; + Ok(CpuAllocation::Count(count)) + } + } + } +} + +impl Serialize for CpuAllocation { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + match self { + CpuAllocation::All => serializer.serialize_str("all"), + CpuAllocation::Count(n) => serializer.serialize_u64(*n as u64), + CpuAllocation::Range(start, end) => { + serializer.serialize_str(&format!("{start}..{end}")) + } + CpuAllocation::NumaAware(numa) => { + if numa.nodes.is_empty() && numa.cores_per_node == 0 { + serializer.serialize_str("numa:auto") + } else { + let nodes_str = numa + .nodes + .iter() + .map(|n| n.to_string()) + .collect::<Vec<_>>() + .join(","); + + let full_str = format!( + "numa:nodes={};cores={};no_ht={}", + nodes_str, numa.cores_per_node, numa.avoid_hyperthread + ); + + serializer.serialize_str(&full_str) + } + } + } + } +} + +impl<'de> Deserialize<'de> for CpuAllocation { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + #[serde(untagged)] + enum CpuAllocationHelper { + String(String), + Number(usize), + } + + match CpuAllocationHelper::deserialize(deserializer)? { + CpuAllocationHelper::String(s) => { + CpuAllocation::from_str(&s).map_err(serde::de::Error::custom) + } + CpuAllocationHelper::Number(n) => Ok(CpuAllocation::Count(n)), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_all() { + assert_eq!(CpuAllocation::from_str("all").unwrap(), CpuAllocation::All); + } + + #[test] + fn test_parse_count() { + assert_eq!( + CpuAllocation::from_str("4").unwrap(), + CpuAllocation::Count(4) + ); + } + + #[test] + fn test_parse_range() { + assert_eq!( + CpuAllocation::from_str("2..8").unwrap(), + CpuAllocation::Range(2, 8) + ); + } + + #[test] + fn test_parse_numa_auto() { + let result = CpuAllocation::from_str("numa:auto").unwrap(); + match result { + CpuAllocation::NumaAware(numa) => { + assert!(numa.nodes.is_empty()); + assert_eq!(numa.cores_per_node, 0); + assert!(numa.avoid_hyperthread); + } + _ => panic!("Expected NumaAware"), + } + } + + #[test] + fn test_parse_numa_explicit() { + let result = CpuAllocation::from_str("numa:nodes=0,1;cores=4;no_ht=true").unwrap(); + match result { + CpuAllocation::NumaAware(numa) => { + assert_eq!(numa.nodes, vec![0, 1]); + assert_eq!(numa.cores_per_node, 4); + assert!(numa.avoid_hyperthread); + } + _ => panic!("Expected NumaAware"), + } + } + + #[test] + fn test_numa_explicit_serde_roundtrip() { + let original = CpuAllocation::NumaAware(NumaConfig { + nodes: vec![0, 1], + cores_per_node: 4, + avoid_hyperthread: true, + }); + let serialized = serde_json::to_string(&original).unwrap(); + let deserialized: CpuAllocation = serde_json::from_str(&serialized).unwrap(); + assert_eq!(original, deserialized); + } +} diff --git a/core/server/src/configs/system.rs b/core/configs/src/server_config/system.rs similarity index 99% rename from core/server/src/configs/system.rs rename to core/configs/src/server_config/system.rs index 58de89ed5..91d9a99ca 100644 --- a/core/server/src/configs/system.rs +++ b/core/configs/src/server_config/system.rs @@ -17,8 +17,8 @@ */ use super::cache_indexes::CacheIndexesConfig; +use super::server::MemoryPoolConfig; use super::sharding::ShardingConfig; -use crate::configs::server::MemoryPoolConfig; use configs::ConfigEnv; use iggy_common::IggyByteSize; use iggy_common::IggyError; diff --git a/core/server/src/configs/tcp.rs b/core/configs/src/server_config/tcp.rs similarity index 100% rename from core/server/src/configs/tcp.rs rename to core/configs/src/server_config/tcp.rs diff --git a/core/server/src/configs/validators.rs b/core/configs/src/server_config/validators.rs similarity index 92% rename from core/server/src/configs/validators.rs rename to core/configs/src/server_config/validators.rs index b9d70aaf3..6a49157a8 100644 --- a/core/server/src/configs/validators.rs +++ b/core/configs/src/server_config/validators.rs @@ -17,18 +17,16 @@ * under the License. */ +use super::COMPONENT; use super::cluster::ClusterConfig; use super::server::{ DataMaintenanceConfig, MessageSaverConfig, MessagesMaintenanceConfig, TelemetryConfig, }; +use super::server::{MemoryPoolConfig, PersonalAccessTokenConfig, ServerConfig}; use super::sharding::{CpuAllocation, ShardingConfig}; +use super::system::SegmentConfig; use super::system::{CompressionConfig, LoggingConfig, PartitionConfig}; -use crate::configs::COMPONENT; -use crate::configs::server::{MemoryPoolConfig, PersonalAccessTokenConfig, ServerConfig}; -use crate::configs::sharding::NumaTopology; -use crate::configs::system::SegmentConfig; -use crate::streaming::segments::*; -use configs::ConfigurationError; +use crate::ConfigurationError; use err_trail::ErrContext; use iggy_common::CompressionAlgorithm; use iggy_common::IggyExpiry; @@ -37,6 +35,9 @@ use iggy_common::Validatable; use std::thread::available_parallelism; use tracing::warn; +/// 1 GiB max segment size. Canonical definition; re-exported by core/server streaming. +pub const SEGMENT_MAX_SIZE_BYTES: u64 = 1024 * 1024 * 1024; + impl Validatable<ConfigurationError> for ServerConfig { fn validate(&self) -> Result<(), ConfigurationError> { self.system @@ -339,7 +340,10 @@ impl Validatable<ConfigurationError> for MemoryPoolConfig { impl Validatable<ConfigurationError> for ShardingConfig { fn validate(&self) -> Result<(), ConfigurationError> { let available_cpus = available_parallelism() - .expect("Failed to get number of CPU cores") + .map_err(|_| { + eprintln!("Failed to detect available CPU cores"); + ConfigurationError::InvalidConfigurationValue + })? .get(); match &self.cpu_allocation { @@ -372,18 +376,9 @@ impl Validatable<ConfigurationError> for ShardingConfig { } Ok(()) } - CpuAllocation::NumaAware(numa_config) => match NumaTopology::detect() { - // TODO: dry the validation, already validate it from the shard allocation - Ok(topology) => numa_config.validate(&topology).map_err(|e| { - eprintln!("Invalid NUMA configuration: {}", e); - ConfigurationError::InvalidConfigurationValue - }), - Err(e) => { - eprintln!("Failed to detect NUMA topology: {}", e); - eprintln!("NUMA allocation requested but system doesn't support it"); - Err(ConfigurationError::InvalidConfigurationValue) - } - }, + // NUMA topology validation requires hwlocality (runtime dep). + // Full NUMA validation happens in server::shard_allocator at startup. + CpuAllocation::NumaAware(_) => Ok(()), } } } @@ -394,19 +389,16 @@ impl Validatable<ConfigurationError> for ClusterConfig { return Ok(()); } - // Validate cluster name is not empty if self.name.trim().is_empty() { eprintln!("Invalid cluster configuration: cluster name cannot be empty"); return Err(ConfigurationError::InvalidConfigurationValue); } - // Validate current node name is not empty if self.node.current.name.trim().is_empty() { eprintln!("Invalid cluster configuration: current node name cannot be empty"); return Err(ConfigurationError::InvalidConfigurationValue); } - // Check for duplicate node names among other nodes let mut node_names = std::collections::HashSet::new(); node_names.insert(self.node.current.name.clone()); @@ -420,16 +412,13 @@ impl Validatable<ConfigurationError> for ClusterConfig { } } - // Validate each other node configuration let mut used_endpoints = std::collections::HashSet::new(); for node in &self.node.others { - // Validate node name is not empty if node.name.trim().is_empty() { eprintln!("Invalid cluster configuration: node name cannot be empty"); return Err(ConfigurationError::InvalidConfigurationValue); } - // Validate IP is not empty if node.ip.trim().is_empty() { eprintln!( "Invalid cluster configuration: IP cannot be empty for node '{}'", @@ -438,7 +427,6 @@ impl Validatable<ConfigurationError> for ClusterConfig { return Err(ConfigurationError::InvalidConfigurationValue); } - // Validate transport ports if provided let port_list = [ ("TCP", node.ports.tcp), ("QUIC", node.ports.quic), @@ -456,7 +444,6 @@ impl Validatable<ConfigurationError> for ClusterConfig { return Err(ConfigurationError::InvalidConfigurationValue); } - // Check for port conflicts across nodes on the same IP let endpoint = format!("{}:{}:{}", node.ip, name, port); if !used_endpoints.insert(endpoint.clone()) { eprintln!( diff --git a/core/server/src/configs/websocket.rs b/core/configs/src/server_config/websocket.rs similarity index 100% rename from core/server/src/configs/websocket.rs rename to core/configs/src/server_config/websocket.rs diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 057ae6281..39c3c5de1 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -54,13 +54,11 @@ ctrlc = { workspace = true } cyper = { workspace = true } cyper-axum = { workspace = true } dashmap = { workspace = true } -derive_more = { workspace = true } dotenvy = { workspace = true } enum_dispatch = { workspace = true } err_trail = { workspace = true } error_set = { workspace = true } figlet-rs = { workspace = true } -figment = { workspace = true } flume = { workspace = true } fs2 = { workspace = true } futures = { workspace = true } @@ -90,10 +88,8 @@ rustls = { workspace = true } rustls-pemfile = { workspace = true } send_wrapper = { workspace = true } serde = { workspace = true } -serde_with = { workspace = true } slab = { workspace = true } socket2 = { workspace = true } -static-toml = { workspace = true } strum = { workspace = true } sysinfo = { workspace = true } tempfile = { workspace = true } @@ -115,5 +111,4 @@ hwlocality = { workspace = true } hwlocality = { workspace = true, features = ["vendored"] } [build-dependencies] -figment = { workspace = true, features = ["json", "toml", "env"] } vergen-git2 = { workspace = true } diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index 3d95fb9c3..02bd742f2 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -23,7 +23,6 @@ use crate::{ configs::{ cache_indexes::CacheIndexesConfig, server::ServerConfig, - sharding::ShardInfo, system::{INDEX_EXTENSION, LOG_EXTENSION, SystemConfig}, }, io::fs_utils::{self, DirEntry}, @@ -36,6 +35,7 @@ use crate::{ frame::ShardFrame, }, }, + shard_allocator::ShardInfo, state::system::{StreamState, TopicState, UserState}, streaming::{ partitions::{ diff --git a/core/server/src/configs/mod.rs b/core/server/src/configs.rs similarity index 76% rename from core/server/src/configs/mod.rs rename to core/server/src/configs.rs index 8bee32c44..dac4d0953 100644 --- a/core/server/src/configs/mod.rs +++ b/core/server/src/configs.rs @@ -17,17 +17,7 @@ * under the License. */ -pub mod cache_indexes; -pub mod cluster; -pub mod defaults; -pub mod displays; -pub mod http; -pub mod quic; -pub mod server; -pub mod sharding; -pub mod system; -pub mod tcp; -pub mod validators; -pub mod websocket; - -pub const COMPONENT: &str = "CONFIG"; +pub use configs::{ + COMPONENT, cache_indexes, cluster, defaults, displays, http, quic, server, sharding, system, + tcp, validators, websocket, +}; diff --git a/core/server/src/configs/sharding.rs b/core/server/src/configs/sharding.rs deleted file mode 100644 index 5224c0b47..000000000 --- a/core/server/src/configs/sharding.rs +++ /dev/null @@ -1,637 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -use hwlocality::Topology; -use hwlocality::bitmap::SpecializedBitmapRef; -use hwlocality::cpu::cpuset::CpuSet; -use hwlocality::memory::binding::{MemoryBindingFlags, MemoryBindingPolicy}; -use hwlocality::object::types::ObjectType::{self, NUMANode}; -#[cfg(target_os = "linux")] -use nix::{sched::sched_setaffinity, unistd::Pid}; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use std::collections::HashSet; -use std::str::FromStr; -use std::sync::Arc; -use std::thread::available_parallelism; -use tracing::info; - -use crate::server_error::ServerError; -use configs::ConfigEnv; - -#[derive(Debug, Deserialize, Serialize, Default, ConfigEnv)] -pub struct ShardingConfig { - #[serde(default)] - #[config_env(leaf)] - pub cpu_allocation: CpuAllocation, -} - -#[derive(Debug, Clone, PartialEq, Default)] -pub enum CpuAllocation { - #[default] - All, - Count(usize), - Range(usize, usize), - NumaAware(NumaConfig), -} - -/// NUMA specific configuration -#[derive(Debug, Clone, PartialEq, Default)] -pub struct NumaConfig { - /// Which NUMA nodes to use (empty = auto-detect all) - pub nodes: Vec<usize>, - /// Cores per node to use (0 = use all available) - pub cores_per_node: usize, - /// skip hyperthread sibling - pub avoid_hyperthread: bool, -} - -impl NumaConfig { - pub fn validate(&self, topology: &NumaTopology) -> Result<(), ServerError> { - let available_nodes = topology.node_count; - - if available_nodes == 0 { - return Err(ServerError::NoNumaNodes); - } - - for &node in &self.nodes { - if node >= available_nodes { - return Err(ServerError::InvalidNode { - requested: node, - available: available_nodes, - }); - } - } - - // Validate core per node - if self.cores_per_node > 0 { - for &node in &self.nodes { - let available_cores = if self.avoid_hyperthread { - topology.physical_cores_for_node(node) - } else { - topology.logical_cores_for_node(node) - }; - - info!( - "core_per_node: {}, available_cores: {}", - self.cores_per_node, available_cores - ); - - if self.cores_per_node > available_cores { - return Err(ServerError::InsufficientCores { - requested: self.cores_per_node, - available: available_cores, - node, - }); - } - } - } - - Ok(()) - } -} - -impl CpuAllocation { - fn parse_numa(s: &str) -> Result<CpuAllocation, String> { - let params = s - .strip_prefix("numa:") - .ok_or_else(|| "Numa config must start with 'numa:'".to_string())?; - - if params == "auto" { - return Ok(CpuAllocation::NumaAware(NumaConfig { - nodes: vec![], - cores_per_node: 0, - avoid_hyperthread: true, - })); - } - - let mut nodes = Vec::new(); - let mut cores_per_node = 0; - let mut avoid_hyperthread = true; - - for param in params.split(';') { - let kv: Vec<&str> = param.split('=').collect(); - if kv.len() != 2 { - return Err(format!( - "Invalid NUMA parameter: '{param}', only available: 'auto'" - )); - } - - match kv[0] { - "nodes" => { - nodes = kv[1] - .split(',') - .map(|n| { - n.parse::<usize>() - .map_err(|_| format!("Invalid node number: {n}")) - }) - .collect::<Result<Vec<_>, _>>()?; - } - "cores" => { - cores_per_node = kv[1] - .parse::<usize>() - .map_err(|_| format!("Invalid cores value: {}", kv[1]))?; - } - "no_ht" => { - avoid_hyperthread = kv[1] - .parse::<bool>() - .map_err(|_| format!("Invalid no ht value: {}", kv[1]))?; - } - _ => { - return Err(format!( - "Unknown NUMA parameter: {}, example: numa:nodes=0;cores=4;no_ht=true", - kv[0] - )); - } - } - } - - Ok(CpuAllocation::NumaAware(NumaConfig { - nodes, - cores_per_node, - avoid_hyperthread, - })) - } -} - -#[derive(Debug)] -pub struct NumaTopology { - topology: Topology, - node_count: usize, - physical_cores_per_node: Vec<usize>, - logical_cores_per_node: Vec<usize>, -} - -impl NumaTopology { - pub fn detect() -> Result<NumaTopology, ServerError> { - let topology = - Topology::new().map_err(|e| ServerError::TopologyDetection { msg: e.to_string() })?; - - let numa_nodes: Vec<_> = topology.objects_with_type(NUMANode).collect(); - - let node_count = numa_nodes.len(); - - if node_count == 0 { - return Err(ServerError::NoNumaNodes); - } - - let mut physical_cores_per_node = Vec::new(); - let mut logical_cores_per_node = Vec::new(); - - for node in numa_nodes { - let cpuset = node.cpuset().ok_or(ServerError::TopologyDetection { - msg: "NUMA node has no CPU set".to_string(), - })?; - - let logical_cores = cpuset.weight().unwrap_or(0); - - let physical_cores = topology - .objects_with_type(ObjectType::Core) - .filter(|core| { - if let Some(core_cpuset) = core.cpuset() { - !(cpuset & core_cpuset).is_empty() - } else { - false - } - }) - .count(); - - physical_cores_per_node.push(physical_cores); - logical_cores_per_node.push(logical_cores); - } - - Ok(Self { - topology, - node_count, - physical_cores_per_node, - logical_cores_per_node, - }) - } - - pub fn physical_cores_for_node(&self, node: usize) -> usize { - self.physical_cores_per_node.get(node).copied().unwrap_or(0) - } - - pub fn logical_cores_for_node(&self, node: usize) -> usize { - self.logical_cores_per_node.get(node).copied().unwrap_or(0) - } - - fn filter_physical_cores(&self, node_cpuset: CpuSet) -> CpuSet { - let mut physical_cpuset = CpuSet::new(); - for core in self.topology.objects_with_type(ObjectType::Core) { - if let Some(core_cpuset) = core.cpuset() { - let intersection = node_cpuset.clone() & core_cpuset; - if !intersection.is_empty() { - // Take the minimum (first) CPU ID for consistency - if let Some(first_cpu) = intersection.iter_set().min() { - physical_cpuset.set(first_cpu) - } - } - } - } - physical_cpuset - } - - /// Get CPU set for a NUMA node - fn get_cpuset_for_node( - &self, - node_id: usize, - avoid_hyperthread: bool, - ) -> Result<CpuSet, ServerError> { - let node = self - .topology - .objects_with_type(ObjectType::NUMANode) - .nth(node_id) - .ok_or(ServerError::InvalidNode { - requested: node_id, - available: self.node_count, - })?; - - let cpuset_ref = node.cpuset().ok_or(ServerError::TopologyDetection { - msg: format!("Node {} has no CPU set", node_id), - })?; - - let cpuset = SpecializedBitmapRef::to_owned(&cpuset_ref); - - if avoid_hyperthread { - Ok(self.filter_physical_cores(cpuset)) - } else { - Ok(cpuset) - } - } -} - -#[derive(Debug, Clone)] -pub struct ShardInfo { - /// CPUs this shard should use - pub cpu_set: HashSet<usize>, - /// NUMA node - pub numa_node: Option<usize>, -} - -impl ShardInfo { - pub fn bind_cpu(&self) -> Result<(), ServerError> { - #[cfg(target_os = "linux")] - { - if self.cpu_set.is_empty() { - return Ok(()); - } - - let mut cpuset = nix::sched::CpuSet::new(); - for &cpu in &self.cpu_set { - cpuset.set(cpu).map_err(|_| ServerError::BindingFailed)?; - } - - sched_setaffinity(Pid::from_raw(0), &cpuset).map_err(|e| { - tracing::error!("Failed to set CPU affinity: {:?}", e); - ServerError::BindingFailed - })?; - - info!("Thread bound to CPUs: {:?}", self.cpu_set); - } - - #[cfg(not(target_os = "linux"))] - { - tracing::debug!("CPU affinity binding skipped on non-Linux platform"); - } - - Ok(()) - } - - pub fn bind_memory(&self) -> Result<(), ServerError> { - if let Some(node_id) = self.numa_node { - let topology = Topology::new().map_err(|err| ServerError::TopologyDetection { - msg: err.to_string(), - })?; - - let node = topology - .objects_with_type(ObjectType::NUMANode) - .nth(node_id) - .ok_or(ServerError::InvalidNode { - requested: node_id, - available: topology.objects_with_type(ObjectType::NUMANode).count(), - })?; - - if let Some(nodeset) = node.nodeset() { - topology - .bind_memory( - nodeset, - MemoryBindingPolicy::Bind, - MemoryBindingFlags::THREAD | MemoryBindingFlags::STRICT, - ) - .map_err(|err| { - tracing::error!("Failed to bind memory {:?}", err); - ServerError::BindingFailed - })?; - - info!("Memory bound to NUMA node {node_id}"); - } - } - - Ok(()) - } -} - -pub struct ShardAllocator { - allocation: CpuAllocation, - topology: Option<Arc<NumaTopology>>, -} - -impl ShardAllocator { - pub fn new(allocation: &CpuAllocation) -> Result<ShardAllocator, ServerError> { - let topology = if matches!(allocation, CpuAllocation::NumaAware(_)) { - let numa_topology = NumaTopology::detect()?; - - Some(Arc::new(numa_topology)) - } else { - None - }; - - Ok(Self { - allocation: allocation.clone(), - topology, - }) - } - - pub fn to_shard_assignments(&self) -> Result<Vec<ShardInfo>, ServerError> { - match &self.allocation { - CpuAllocation::All => { - let available_cpus = available_parallelism() - .map_err(|err| ServerError::Other { - msg: format!("Failed to get available_parallelism: {:?}", err), - })? - .get(); - - let shard_assignments: Vec<_> = (0..available_cpus) - .map(|cpu_id| ShardInfo { - cpu_set: HashSet::from([cpu_id]), - numa_node: None, - }) - .collect(); - - info!( - "Using all available CPU cores ({} shards with affinity)", - shard_assignments.len() - ); - - Ok(shard_assignments) - } - CpuAllocation::Count(count) => { - let shard_assignments = (0..*count) - .map(|cpu_id| ShardInfo { - cpu_set: HashSet::from([cpu_id]), - numa_node: None, - }) - .collect(); - - info!("Using {count} shards with affinity to cores 0..{count}"); - - Ok(shard_assignments) - } - CpuAllocation::Range(start, end) => { - let shard_assignments = (*start..*end) - .map(|cpu_id| ShardInfo { - cpu_set: HashSet::from([cpu_id]), - numa_node: None, - }) - .collect(); - - info!( - "Using {} shards with affinity to cores {start}..{end}", - end - start - ); - - Ok(shard_assignments) - } - CpuAllocation::NumaAware(numa_config) => { - let topology = self.topology.as_ref().ok_or(ServerError::NoTopology)?; - self.compute_numa_assignments(topology, numa_config) - } - } - } - - fn compute_numa_assignments( - &self, - topology: &NumaTopology, - numa: &NumaConfig, - ) -> Result<Vec<ShardInfo>, ServerError> { - // Determine which noes to use - let nodes = if numa.nodes.is_empty() { - // Auto: use all nodes - (0..topology.node_count).collect() - } else { - numa.nodes.clone() - }; - - // Determine cores per node - let cores_per_node = if numa.cores_per_node == 0 { - // Auto: use all available - if numa.avoid_hyperthread { - topology.physical_cores_for_node(nodes[0]) - } else { - topology.logical_cores_for_node(nodes[0]) - } - } else { - numa.cores_per_node - }; - - let mut shard_infos = Vec::new(); - - let node_cpus: Vec<Vec<usize>> = nodes - .iter() - .map(|&node_id| { - let cpuset = topology.get_cpuset_for_node(node_id, numa.avoid_hyperthread)?; - Ok(cpuset.iter_set().map(usize::from).collect()) - }) - .collect::<Result<_, ServerError>>()?; - - // For each node, create one shard per core (thread-per-core) - for (idx, &node_id) in nodes.iter().enumerate() { - let available_cpus = &node_cpus[idx]; - - // Take first core_per_node CPUs from this node - let cores_to_use: Vec<usize> = available_cpus - .iter() - .take(cores_per_node) - .copied() - .collect(); - - if cores_to_use.len() < cores_per_node { - return Err(ServerError::InsufficientCores { - requested: cores_per_node, - available: available_cpus.len(), - node: node_id, - }); - } - - // Create one shard per core - for cpu_id in cores_to_use { - shard_infos.push(ShardInfo { - cpu_set: HashSet::from([cpu_id]), - numa_node: Some(node_id), - }); - } - } - - info!( - "Using {} shards with {} NUMA node, {} cores per node, and avoid hyperthread {}", - shard_infos.len(), - nodes.len(), - cores_per_node, - numa.avoid_hyperthread - ); - - Ok(shard_infos) - } -} - -impl FromStr for CpuAllocation { - type Err = String; - - fn from_str(s: &str) -> Result<Self, Self::Err> { - match s { - "all" => Ok(CpuAllocation::All), - s if s.starts_with("numa:") => Self::parse_numa(s), - s if s.contains("..") => { - let parts: Vec<&str> = s.split("..").collect(); - if parts.len() != 2 { - return Err(format!("Invalid range format: {s}. Expected 'start..end'")); - } - let start = parts[0] - .parse::<usize>() - .map_err(|_| format!("Invalid start value: {}", parts[0]))?; - let end = parts[1] - .parse::<usize>() - .map_err(|_| format!("Invalid end value: {}", parts[1]))?; - Ok(CpuAllocation::Range(start, end)) - } - s => { - let count = s - .parse::<usize>() - .map_err(|_| format!("Invalid shard count: {s}"))?; - Ok(CpuAllocation::Count(count)) - } - } - } -} - -impl Serialize for CpuAllocation { - fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> - where - S: Serializer, - { - match self { - CpuAllocation::All => serializer.serialize_str("all"), - CpuAllocation::Count(n) => serializer.serialize_u64(*n as u64), - CpuAllocation::Range(start, end) => { - serializer.serialize_str(&format!("{start}..{end}")) - } - CpuAllocation::NumaAware(numa) => { - if numa.nodes.is_empty() && numa.cores_per_node == 0 { - serializer.serialize_str("numa:auto") - } else { - let nodes_str = numa - .nodes - .iter() - .map(|n| n.to_string()) - .collect::<Vec<_>>() - .join(","); - - let full_str = format!( - "numa:nodes={};cores:{};no_ht={}", - nodes_str, numa.cores_per_node, numa.avoid_hyperthread - ); - - serializer.serialize_str(&full_str) - } - } - } - } -} - -impl<'de> Deserialize<'de> for CpuAllocation { - fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> - where - D: Deserializer<'de>, - { - #[derive(Deserialize)] - #[serde(untagged)] - enum CpuAllocationHelper { - String(String), - Number(usize), - } - - match CpuAllocationHelper::deserialize(deserializer)? { - CpuAllocationHelper::String(s) => { - CpuAllocation::from_str(&s).map_err(serde::de::Error::custom) - } - CpuAllocationHelper::Number(n) => Ok(CpuAllocation::Count(n)), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_parse_all() { - assert_eq!(CpuAllocation::from_str("all").unwrap(), CpuAllocation::All); - } - - #[test] - fn test_parse_count() { - assert_eq!( - CpuAllocation::from_str("4").unwrap(), - CpuAllocation::Count(4) - ); - } - - #[test] - fn test_parse_range() { - assert_eq!( - CpuAllocation::from_str("2..8").unwrap(), - CpuAllocation::Range(2, 8) - ); - } - - #[test] - fn test_parse_numa_auto() { - let result = CpuAllocation::from_str("numa:auto").unwrap(); - match result { - CpuAllocation::NumaAware(numa) => { - assert!(numa.nodes.is_empty()); - assert_eq!(numa.cores_per_node, 0); - assert!(numa.avoid_hyperthread); - } - _ => panic!("Expected NumaAware"), - } - } - - #[test] - fn test_parse_numa_explicit() { - let result = CpuAllocation::from_str("numa:nodes=0,1;cores=4;no_ht=true").unwrap(); - match result { - CpuAllocation::NumaAware(numa) => { - assert_eq!(numa.nodes, vec![0, 1]); - assert_eq!(numa.cores_per_node, 4); - assert!(numa.avoid_hyperthread); - } - _ => panic!("Expected NumaAware"), - } - } -} diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs index 1fc0d0d3f..b4bac970d 100644 --- a/core/server/src/lib.rs +++ b/core/server/src/lib.rs @@ -42,6 +42,7 @@ pub mod metadata; pub mod quic; pub mod server_error; pub mod shard; +pub mod shard_allocator; pub mod state; pub mod streaming; pub mod tcp; diff --git a/core/server/src/main.rs b/core/server/src/main.rs index d3ce11f49..6a7fc1e06 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -32,7 +32,6 @@ use server::bootstrap::{ create_directories, create_shard_connections, create_shard_executor, load_config, load_metadata, resolve_persister, update_system_info, }; -use server::configs::sharding::ShardAllocator; use server::diagnostics::{print_io_uring_permission_info, print_locked_memory_limit_info}; use server::io::fs_utils; use server::log::logger::Logging; @@ -40,6 +39,7 @@ use server::metadata::{Metadata, create_metadata_handles}; use server::server_error::ServerError; use server::shard::system::info::SystemInfo; use server::shard::{IggyShard, calculate_shard_assignment}; +use server::shard_allocator::ShardAllocator; use server::state::file::FileState; use server::state::system::SystemState; use server::streaming::clients::client_manager::{Client, ClientManager}; diff --git a/core/server/src/server_error.rs b/core/server/src/server_error.rs index 74dd71118..6f0124eb2 100644 --- a/core/server/src/server_error.rs +++ b/core/server/src/server_error.rs @@ -37,34 +37,8 @@ error_set!( } NumaError := { - #[display("Failed to detect topology: {}", msg)] - TopologyDetection { - msg: String - }, - - #[display("There is no NUMA node on this server")] - NoNumaNodes, - - #[display("No Topology")] - NoTopology, - - #[display("Binding Failed")] - BindingFailed, - - #[display("Insufficient cores on node {}: requested {}, only {} available", node, requested, available)] - InsufficientCores { - requested: usize, - available: usize, - node: usize, - }, - - #[display("Invalid NUMA node: requested {}, only available {} node", requested, available)] - InvalidNode { requested: usize, available: usize }, - - #[display("Other error: {}", msg)] - Other { - msg: String - }, + #[display("{0}")] + Sharding(crate::shard_allocator::ShardingError), } ConfigurationError := { diff --git a/core/server/src/shard_allocator.rs b/core/server/src/shard_allocator.rs new file mode 100644 index 000000000..334c09f28 --- /dev/null +++ b/core/server/src/shard_allocator.rs @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use configs::sharding::{CpuAllocation, NumaConfig}; +use hwlocality::Topology; +use hwlocality::bitmap::SpecializedBitmapRef; +use hwlocality::cpu::cpuset::CpuSet; +use hwlocality::memory::binding::{MemoryBindingFlags, MemoryBindingPolicy}; +use hwlocality::object::types::ObjectType::{self, NUMANode}; +#[cfg(target_os = "linux")] +use nix::{sched::sched_setaffinity, unistd::Pid}; +use std::collections::HashSet; +use std::sync::Arc; +use std::thread::available_parallelism; +use tracing::info; + +#[derive(Debug, thiserror::Error)] +pub enum ShardingError { + #[error("Failed to detect topology: {msg}")] + TopologyDetection { msg: String }, + + #[error("There is no NUMA node on this server")] + NoNumaNodes, + + #[error("No Topology")] + NoTopology, + + #[error("Binding Failed")] + BindingFailed, + + #[error("Insufficient cores on node {node}: requested {requested}, only {available} available")] + InsufficientCores { + requested: usize, + available: usize, + node: usize, + }, + + #[error("Invalid NUMA node: requested {requested}, only available {available} node")] + InvalidNode { requested: usize, available: usize }, + + #[error("Other error: {msg}")] + Other { msg: String }, +} + +#[derive(Debug)] +pub struct NumaTopology { + topology: Topology, + node_count: usize, + physical_cores_per_node: Vec<usize>, + logical_cores_per_node: Vec<usize>, +} + +impl NumaTopology { + pub fn detect() -> Result<NumaTopology, ShardingError> { + let topology = + Topology::new().map_err(|e| ShardingError::TopologyDetection { msg: e.to_string() })?; + + let numa_nodes: Vec<_> = topology.objects_with_type(NUMANode).collect(); + + let node_count = numa_nodes.len(); + + if node_count == 0 { + return Err(ShardingError::NoNumaNodes); + } + + let mut physical_cores_per_node = Vec::new(); + let mut logical_cores_per_node = Vec::new(); + + for node in numa_nodes { + let cpuset = node.cpuset().ok_or(ShardingError::TopologyDetection { + msg: "NUMA node has no CPU set".to_string(), + })?; + + let logical_cores = cpuset.weight().unwrap_or(0); + + let physical_cores = topology + .objects_with_type(ObjectType::Core) + .filter(|core| { + if let Some(core_cpuset) = core.cpuset() { + !(cpuset & core_cpuset).is_empty() + } else { + false + } + }) + .count(); + + physical_cores_per_node.push(physical_cores); + logical_cores_per_node.push(logical_cores); + } + + Ok(Self { + topology, + node_count, + physical_cores_per_node, + logical_cores_per_node, + }) + } + + pub fn physical_cores_for_node(&self, node: usize) -> usize { + self.physical_cores_per_node.get(node).copied().unwrap_or(0) + } + + pub fn logical_cores_for_node(&self, node: usize) -> usize { + self.logical_cores_per_node.get(node).copied().unwrap_or(0) + } + + fn filter_physical_cores(&self, node_cpuset: CpuSet) -> CpuSet { + let mut physical_cpuset = CpuSet::new(); + for core in self.topology.objects_with_type(ObjectType::Core) { + if let Some(core_cpuset) = core.cpuset() { + let intersection = node_cpuset.clone() & core_cpuset; + if !intersection.is_empty() + && let Some(first_cpu) = intersection.iter_set().min() + { + physical_cpuset.set(first_cpu) + } + } + } + physical_cpuset + } + + fn get_cpuset_for_node( + &self, + node_id: usize, + avoid_hyperthread: bool, + ) -> Result<CpuSet, ShardingError> { + let node = self + .topology + .objects_with_type(ObjectType::NUMANode) + .nth(node_id) + .ok_or(ShardingError::InvalidNode { + requested: node_id, + available: self.node_count, + })?; + + let cpuset_ref = node.cpuset().ok_or(ShardingError::TopologyDetection { + msg: format!("Node {} has no CPU set", node_id), + })?; + + let cpuset = SpecializedBitmapRef::to_owned(&cpuset_ref); + + if avoid_hyperthread { + Ok(self.filter_physical_cores(cpuset)) + } else { + Ok(cpuset) + } + } +} + +#[derive(Debug, Clone)] +pub struct ShardInfo { + pub cpu_set: HashSet<usize>, + pub numa_node: Option<usize>, +} + +impl ShardInfo { + pub fn bind_cpu(&self) -> Result<(), ShardingError> { + #[cfg(target_os = "linux")] + { + if self.cpu_set.is_empty() { + return Ok(()); + } + + let mut cpuset = nix::sched::CpuSet::new(); + for &cpu in &self.cpu_set { + cpuset.set(cpu).map_err(|_| ShardingError::BindingFailed)?; + } + + sched_setaffinity(Pid::from_raw(0), &cpuset).map_err(|e| { + tracing::error!("Failed to set CPU affinity: {:?}", e); + ShardingError::BindingFailed + })?; + + info!("Thread bound to CPUs: {:?}", self.cpu_set); + } + + #[cfg(not(target_os = "linux"))] + { + tracing::debug!("CPU affinity binding skipped on non-Linux platform"); + } + + Ok(()) + } + + pub fn bind_memory(&self) -> Result<(), ShardingError> { + if let Some(node_id) = self.numa_node { + let topology = Topology::new().map_err(|err| ShardingError::TopologyDetection { + msg: err.to_string(), + })?; + + let node = topology + .objects_with_type(ObjectType::NUMANode) + .nth(node_id) + .ok_or(ShardingError::InvalidNode { + requested: node_id, + available: topology.objects_with_type(ObjectType::NUMANode).count(), + })?; + + if let Some(nodeset) = node.nodeset() { + topology + .bind_memory( + nodeset, + MemoryBindingPolicy::Bind, + MemoryBindingFlags::THREAD | MemoryBindingFlags::STRICT, + ) + .map_err(|err| { + tracing::error!("Failed to bind memory {:?}", err); + ShardingError::BindingFailed + })?; + + info!("Memory bound to NUMA node {node_id}"); + } + } + + Ok(()) + } +} + +pub struct ShardAllocator { + allocation: CpuAllocation, + topology: Option<Arc<NumaTopology>>, +} + +impl ShardAllocator { + pub fn new(allocation: &CpuAllocation) -> Result<ShardAllocator, ShardingError> { + let topology = if matches!(allocation, CpuAllocation::NumaAware(_)) { + let numa_topology = NumaTopology::detect()?; + + Some(Arc::new(numa_topology)) + } else { + None + }; + + Ok(Self { + allocation: allocation.clone(), + topology, + }) + } + + pub fn to_shard_assignments(&self) -> Result<Vec<ShardInfo>, ShardingError> { + match &self.allocation { + CpuAllocation::All => { + let available_cpus = available_parallelism() + .map_err(|err| ShardingError::Other { + msg: format!("Failed to get available_parallelism: {:?}", err), + })? + .get(); + + let shard_assignments: Vec<_> = (0..available_cpus) + .map(|cpu_id| ShardInfo { + cpu_set: HashSet::from([cpu_id]), + numa_node: None, + }) + .collect(); + + info!( + "Using all available CPU cores ({} shards with affinity)", + shard_assignments.len() + ); + + Ok(shard_assignments) + } + CpuAllocation::Count(count) => { + let shard_assignments = (0..*count) + .map(|cpu_id| ShardInfo { + cpu_set: HashSet::from([cpu_id]), + numa_node: None, + }) + .collect(); + + info!("Using {count} shards with affinity to cores 0..{count}"); + + Ok(shard_assignments) + } + CpuAllocation::Range(start, end) => { + let shard_assignments = (*start..*end) + .map(|cpu_id| ShardInfo { + cpu_set: HashSet::from([cpu_id]), + numa_node: None, + }) + .collect(); + + info!( + "Using {} shards with affinity to cores {start}..{end}", + end - start + ); + + Ok(shard_assignments) + } + CpuAllocation::NumaAware(numa_config) => { + let topology = self.topology.as_ref().ok_or(ShardingError::NoTopology)?; + self.compute_numa_assignments(topology, numa_config) + } + } + } + + fn compute_numa_assignments( + &self, + topology: &NumaTopology, + numa: &NumaConfig, + ) -> Result<Vec<ShardInfo>, ShardingError> { + let nodes = if numa.nodes.is_empty() { + (0..topology.node_count).collect() + } else { + numa.nodes.clone() + }; + + let cores_per_node = if numa.cores_per_node == 0 { + if numa.avoid_hyperthread { + topology.physical_cores_for_node(nodes[0]) + } else { + topology.logical_cores_for_node(nodes[0]) + } + } else { + numa.cores_per_node + }; + + let mut shard_infos = Vec::new(); + + let node_cpus: Vec<Vec<usize>> = nodes + .iter() + .map(|&node_id| { + let cpuset = topology.get_cpuset_for_node(node_id, numa.avoid_hyperthread)?; + Ok(cpuset.iter_set().map(usize::from).collect()) + }) + .collect::<Result<_, ShardingError>>()?; + + for (idx, &node_id) in nodes.iter().enumerate() { + let available_cpus = &node_cpus[idx]; + + let cores_to_use: Vec<usize> = available_cpus + .iter() + .take(cores_per_node) + .copied() + .collect(); + + if cores_to_use.len() < cores_per_node { + return Err(ShardingError::InsufficientCores { + requested: cores_per_node, + available: available_cpus.len(), + node: node_id, + }); + } + + for cpu_id in cores_to_use { + shard_infos.push(ShardInfo { + cpu_set: HashSet::from([cpu_id]), + numa_node: Some(node_id), + }); + } + } + + info!( + "Using {} shards with {} NUMA node, {} cores per node, and avoid hyperthread {}", + shard_infos.len(), + nodes.len(), + cores_per_node, + numa.avoid_hyperthread + ); + + Ok(shard_infos) + } +} diff --git a/core/server/src/streaming/segments/mod.rs b/core/server/src/streaming/segments/mod.rs index 878af0eb1..4b3b883d9 100644 --- a/core/server/src/streaming/segments/mod.rs +++ b/core/server/src/streaming/segments/mod.rs @@ -32,4 +32,4 @@ pub use types::IggyMessageViewMut; pub use types::IggyMessagesBatchMut; pub use types::IggyMessagesBatchSet; -pub const SEGMENT_MAX_SIZE_BYTES: u64 = 1024 * 1024 * 1024; +pub use crate::configs::validators::SEGMENT_MAX_SIZE_BYTES;
