mlevkov opened a new issue, #2928:
URL: https://github.com/apache/iggy/issues/2928
## Description
The `AutoCommitWhen::PollingMessages` strategy commits consumer group
offsets **before** `consume()` is called on the sink plugin. This means if
`consume()` fails (and the failure is already silently discarded per #2927),
the messages are permanently lost — the consumer group has already advanced
past them.
## Sequence
```
1. consumer.next() → messages received
2. Offsets committed → consumer group advances ← PROBLEM
3. process_messages() → calls sink consume() via FFI
4. consume() fails → messages already committed, lost forever
```
## Location
`core/connectors/runtime/src/sink.rs`:
- **Line 421**: Consumer configured with `AutoCommitWhen::PollingMessages`
```rust
.auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
```
- **Lines 266-344** (`consume_messages` loop): `consumer.next()` at line 272
triggers the auto-commit, but `process_messages()` doesn't execute until line
311.
## Impact
Combined with #2927 (consume return value discarded), **at-least-once
delivery is not achievable** with the current runtime for any sink connector.
If `consume()` fails and the process restarts, those messages will never be
retried because the offsets have already been committed.
## Suggested Fix
Change auto-commit strategy to `AutoCommitWhen::ConsumerStopped` or
`AutoCommit::Disabled`, and commit offsets **after** successful `consume()`:
```rust
// Option A: Commit after processing
.auto_commit(AutoCommit::Disabled)
// ... in the consume loop, after successful consume():
consumer.store_offset(offset).await?;
// Option B: Use AfterPollingMessages if available
.auto_commit(AutoCommit::When(AutoCommitWhen::AfterProcessing))
```
The exact API depends on the Iggy SDK's consumer group offset management
capabilities.
## Related
- #2927 (consume return value discarded)
- Discussed in #2901 (HTTP sink connector proposal)
- HTTP sink PR: #2925
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]