This is an automated email from the ASF dual-hosted git repository.

maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 9541332d1 fix(server): prevent storing consumer offsets beyond max 
partition range (#2794)
9541332d1 is described below

commit 9541332d18bcf83fc0390f8ffc83a645f44ea3d4
Author: omnitrix <[email protected]>
AuthorDate: Tue Feb 24 17:46:07 2026 +0800

    fix(server): prevent storing consumer offsets beyond max partition range 
(#2794)
    
    closes #2730
---
 core/integration/tests/server/general.rs           |  18 +-
 .../scenarios/invalid_consumer_offset_scenario.rs  | 236 +++++++++++++++++++++
 core/integration/tests/server/scenarios/mod.rs     |   1 +
 core/server/src/shard/system/consumer_offsets.rs   |  11 +-
 core/server/src/shard/system/utils.rs              |  27 +++
 .../blocking/ConsumerOffsetsClientBaseTest.java    |   7 +
 6 files changed, 288 insertions(+), 12 deletions(-)

diff --git a/core/integration/tests/server/general.rs 
b/core/integration/tests/server/general.rs
index 13b7c1dfa..f98ac885d 100644
--- a/core/integration/tests/server/general.rs
+++ b/core/integration/tests/server/general.rs
@@ -17,8 +17,9 @@
 
 use crate::server::scenarios::{
     authentication_scenario, bench_scenario, 
consumer_timestamp_polling_scenario,
-    create_message_payload, message_headers_scenario, permissions_scenario, 
snapshot_scenario,
-    stream_size_validation_scenario, system_scenario, user_scenario,
+    create_message_payload, invalid_consumer_offset_scenario, 
message_headers_scenario,
+    permissions_scenario, snapshot_scenario, stream_size_validation_scenario, 
system_scenario,
+    user_scenario,
 };
 use integration::iggy_harness;
 
@@ -151,3 +152,16 @@ async fn consumer_timestamp_polling(harness: &TestHarness) 
{
 async fn snapshot(harness: &TestHarness) {
     snapshot_scenario::run(harness).await;
 }
+
+#[iggy_harness(
+    test_client_transport = [Tcp, Http, Quic, WebSocket],
+    server(
+        tcp.socket.override_defaults = true,
+        tcp.socket.nodelay = true,
+        quic.max_idle_timeout = "500s",
+        quic.keep_alive_interval = "15s"
+    )
+)]
+async fn invalid_consumer_offset(harness: &TestHarness) {
+    invalid_consumer_offset_scenario::run(harness).await;
+}
diff --git 
a/core/integration/tests/server/scenarios/invalid_consumer_offset_scenario.rs 
b/core/integration/tests/server/scenarios/invalid_consumer_offset_scenario.rs
new file mode 100644
index 000000000..f612f6850
--- /dev/null
+++ 
b/core/integration/tests/server/scenarios/invalid_consumer_offset_scenario.rs
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use bytes::Bytes;
+use iggy::prelude::*;
+use iggy_common::IggyError;
+use integration::harness::{TestHarness, assert_clean_system};
+
+const STREAM_NAME: &str = "test-stream-offsets";
+const TOPIC_NAME: &str = "test-topic-offsets";
+const PARTITIONS_COUNT: u32 = 1;
+const PARTITION_ID: u32 = 0;
+const CONSUMER_NAME: &str = "test-consumer";
+const CONSUMER_GROUP_NAME: &str = "test-consumer-group-offsets";
+const MESSAGES_COUNT: u32 = 5;
+
+fn assert_invalid_offset_error(err: &IggyError, expected_offset: u64) {
+    let expected_code = IggyError::InvalidOffset(expected_offset).as_code();
+
+    // HTTP client currently doesn't deserialize the ErrorResponse body into 
specific IggyError
+    // variants for 400 Bad Request. Until it does, we must manually check the 
JSON body for the expected error ID.
+    let is_match = err.as_code() == expected_code
+        || matches!(
+            err,
+            IggyError::HttpResponseError(400, reason)
+            if reason.contains(&format!("\"id\":{}", expected_code))
+        );
+
+    assert!(
+        is_match,
+        "Expected error code {}, got {:?}",
+        expected_code, err
+    );
+}
+
+pub async fn run(harness: &TestHarness) {
+    let client = harness
+        .root_client()
+        .await
+        .expect("Failed to get root client");
+
+    let stream = Identifier::named(STREAM_NAME).unwrap();
+    let topic = Identifier::named(TOPIC_NAME).unwrap();
+
+    let joined_group = initialize(&client, &stream, &topic).await;
+
+    let consumer = Consumer {
+        kind: ConsumerKind::Consumer,
+        id: Identifier::named(CONSUMER_NAME).unwrap(),
+    };
+    let consumer_group = Consumer {
+        kind: ConsumerKind::ConsumerGroup,
+        id: Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+    };
+
+    // 1. Empty partition scenarios
+    test_offset_for_empty_partition(&client, &consumer, &stream, &topic).await;
+    if joined_group {
+        test_offset_for_empty_partition(&client, &consumer_group, &stream, 
&topic).await;
+    }
+
+    // 2. Send messages to create non-empty partition scenarios
+    send_messages(&client, &stream, &topic).await;
+
+    // 3. Non-empty partition scenarios
+    test_offset_for_non_empty_partition(&client, &consumer, &stream, 
&topic).await;
+    if joined_group {
+        test_offset_for_non_empty_partition(&client, &consumer_group, &stream, 
&topic).await;
+    }
+
+    cleanup(&client, &stream, &topic, joined_group).await;
+}
+
+async fn initialize(client: &IggyClient, stream: &Identifier, topic: 
&Identifier) -> bool {
+    client.create_stream(STREAM_NAME).await.unwrap();
+    client
+        .create_topic(
+            stream,
+            TOPIC_NAME,
+            PARTITIONS_COUNT,
+            Default::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::Unlimited,
+        )
+        .await
+        .unwrap();
+
+    client
+        .create_consumer_group(stream, topic, CONSUMER_GROUP_NAME)
+        .await
+        .unwrap();
+
+    let join_result = client
+        .join_consumer_group(
+            stream,
+            topic,
+            &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+        )
+        .await;
+    match join_result {
+        Ok(_) => true,
+        Err(e) => {
+            assert_eq!(e.as_code(), IggyError::FeatureUnavailable.as_code());
+            false
+        }
+    }
+}
+
+async fn cleanup(client: &IggyClient, stream: &Identifier, topic: &Identifier, 
joined_group: bool) {
+    if joined_group {
+        client
+            .leave_consumer_group(
+                stream,
+                topic,
+                &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+            )
+            .await
+            .unwrap();
+    }
+
+    client
+        .delete_consumer_group(
+            stream,
+            topic,
+            &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+        )
+        .await
+        .unwrap();
+
+    client.delete_stream(stream).await.unwrap();
+    assert_clean_system(client).await;
+}
+
+async fn test_offset_for_empty_partition(
+    client: &IggyClient,
+    consumer: &Consumer,
+    stream: &Identifier,
+    topic: &Identifier,
+) {
+    // Attempt to store offset 0 on a newly created, empty partition.
+    // This operation should fail to prevent prematurely advancing the offset.
+    let err = client
+        .store_consumer_offset(consumer, stream, topic, Some(PARTITION_ID), 0)
+        .await
+        .expect_err("Storing offset 0 on empty partition should fail");
+    assert_invalid_offset_error(&err, 0);
+
+    // Attempt to store current_offset + 1 (should fail with specific error)
+    let err = client
+        .store_consumer_offset(consumer, stream, topic, Some(PARTITION_ID), 1)
+        .await
+        .expect_err("Storing offset 1 on empty partition should fail");
+    assert_invalid_offset_error(&err, 1);
+}
+
+async fn test_offset_for_non_empty_partition(
+    client: &IggyClient,
+    consumer: &Consumer,
+    stream: &Identifier,
+    topic: &Identifier,
+) {
+    // Attempt to store offset less than current_offset (should succeed)
+    // For 5 messages, offsets are 0, 1, 2, 3, 4, so current_offset is 4.
+    client
+        .store_consumer_offset(consumer, stream, topic, Some(PARTITION_ID), 2)
+        .await
+        .unwrap();
+
+    let offset_info = client
+        .get_consumer_offset(consumer, stream, topic, Some(PARTITION_ID))
+        .await
+        .unwrap()
+        .expect("Failed to get offset");
+    assert_eq!(offset_info.stored_offset, 2);
+
+    // Attempt to store exactly current_offset (should succeed)
+    let max_offset = (MESSAGES_COUNT - 1) as u64;
+    client
+        .store_consumer_offset(consumer, stream, topic, Some(PARTITION_ID), 
max_offset)
+        .await
+        .unwrap();
+
+    let offset_info = client
+        .get_consumer_offset(consumer, stream, topic, Some(PARTITION_ID))
+        .await
+        .unwrap()
+        .expect("Failed to get offset");
+    assert_eq!(offset_info.stored_offset, max_offset);
+
+    // Attempt to store current_offset + 1 (should fail)
+    let invalid_offset = max_offset + 1;
+    let err = client
+        .store_consumer_offset(consumer, stream, topic, Some(PARTITION_ID), 
invalid_offset)
+        .await
+        .expect_err("Storing offset max_offset + 1 should fail");
+    assert_invalid_offset_error(&err, invalid_offset);
+}
+
+async fn send_messages(client: &IggyClient, stream: &Identifier, topic: 
&Identifier) {
+    let mut messages = Vec::new();
+    for offset in 0..MESSAGES_COUNT {
+        messages.push(
+            IggyMessage::builder()
+                .id((offset + 1) as u128)
+                .payload(Bytes::from(format!("message {}", offset)))
+                .build()
+                .expect("Failed to create message"),
+        );
+    }
+    client
+        .send_messages(
+            stream,
+            topic,
+            &Partitioning::partition_id(PARTITION_ID),
+            &mut messages,
+        )
+        .await
+        .unwrap();
+}
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index 22b08f596..5a3b3f7a4 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -29,6 +29,7 @@ pub mod consumer_timestamp_polling_scenario;
 pub mod create_message_payload;
 pub mod cross_protocol_pat_scenario;
 pub mod encryption_scenario;
+pub mod invalid_consumer_offset_scenario;
 pub mod log_rotation_scenario;
 pub mod message_cleanup_scenario;
 pub mod message_headers_scenario;
diff --git a/core/server/src/shard/system/consumer_offsets.rs 
b/core/server/src/shard/system/consumer_offsets.rs
index c2572231b..ab287a834 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -51,16 +51,7 @@ impl IggyShard {
             return Err(IggyError::NotResolvedConsumer(consumer.id));
         };
 
-        if !self
-            .metadata
-            .partition_exists(topic.stream_id, topic.topic_id, partition_id)
-        {
-            return Err(IggyError::PartitionNotFound(
-                partition_id,
-                Identifier::numeric(topic.topic_id as u32).expect("valid topic 
id"),
-                Identifier::numeric(topic.stream_id as u32).expect("valid 
stream id"),
-            ));
-        }
+        self.validate_partition_offset(topic.stream_id, topic.topic_id, 
partition_id, offset)?;
 
         self.store_consumer_offset_base(
             topic.stream_id,
diff --git a/core/server/src/shard/system/utils.rs 
b/core/server/src/shard/system/utils.rs
index 1fb59a645..fc5f37a5f 100644
--- a/core/server/src/shard/system/utils.rs
+++ b/core/server/src/shard/system/utils.rs
@@ -127,6 +127,33 @@ impl IggyShard {
         Ok(())
     }
 
+    /// Validates that consumer_offset does not exceed actual partition offset.
+    pub fn validate_partition_offset(
+        &self,
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+        consumer_offset: u64,
+    ) -> Result<(), IggyError> {
+        let partition_stats = self
+            .metadata
+            .get_partition_stats_by_ids(stream_id, topic_id, partition_id)
+            .ok_or(IggyError::PartitionNotFound(
+                partition_id,
+                Identifier::numeric(topic_id as u32).expect("numeric 
identifier is always valid"),
+                Identifier::numeric(stream_id as u32).expect("numeric 
identifier is always valid"),
+            ))?;
+
+        // Also rejects storing any offset if the partition is completely 
empty (i.e., has never contained any messages).
+        if (partition_stats.messages_count_inconsistent() == 0
+            && partition_stats.current_offset() == 0)
+            || consumer_offset > partition_stats.current_offset()
+        {
+            return Err(IggyError::InvalidOffset(consumer_offset));
+        }
+        Ok(())
+    }
+
     /// Resolves consumer with partition ID for polling/offset operations.
     /// For consumer groups, all lookups happen under a single metadata read 
guard.
     pub fn resolve_consumer_with_partition_id(
diff --git 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/ConsumerOffsetsClientBaseTest.java
 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/ConsumerOffsetsClientBaseTest.java
index f80fc763a..633c84cd8 100644
--- 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/ConsumerOffsetsClientBaseTest.java
+++ 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/ConsumerOffsetsClientBaseTest.java
@@ -21,10 +21,13 @@ package org.apache.iggy.client.blocking;
 
 import org.apache.iggy.consumergroup.Consumer;
 import org.apache.iggy.identifier.ConsumerId;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.Partitioning;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.math.BigInteger;
+import java.util.List;
 import java.util.Optional;
 
 import static org.apache.iggy.TestConstants.STREAM_NAME;
@@ -45,6 +48,10 @@ public abstract class ConsumerOffsetsClientBaseTest extends 
IntegrationTest {
 
     @Test
     void shouldGetConsumerOffset() {
+        // given. Send message to ensure partition is not empty so we can 
store offset 0
+        client.messages()
+                .sendMessages(STREAM_NAME, TOPIC_NAME, 
Partitioning.partitionId(0L), List.of(Message.of("test")));
+
         // when
         var consumer = new Consumer(Consumer.Kind.Consumer, 
ConsumerId.of(1223L));
         consumerOffsetsClient.storeConsumerOffset(STREAM_NAME, TOPIC_NAME, 
Optional.empty(), consumer, BigInteger.ZERO);

Reply via email to